1313# limitations under the License.
1414"""End to end tests of Timesketch client functionality."""
1515
16+ import uuid
1617import json
18+ import random
19+ import os
1720import time
1821
1922from timesketch_api_client import search
@@ -26,6 +29,8 @@ class ClientTest(interface.BaseEndToEndTest):
2629 """End to end tests for client functionality."""
2730
2831 NAME = "client_test"
32+ RULEID1 = str (uuid .uuid4 ())
33+ RULEID2 = str (uuid .uuid4 ())
2934
3035 def test_client (self ):
3136 """Client tests."""
@@ -87,9 +92,10 @@ def test_direct_opensearch(self):
8792
8893 def test_sigmarule_create (self ):
8994 """Create a Sigma rule in database"""
90- MOCK_SIGMA_RULE = """
95+
96+ MOCK_SIGMA_RULE = f"""
9197title: Suspicious Installation of bbbbbb
92- id: 5266a592-b793-11ea-b3de-bbbbbb
98+ id: { self . RULEID1 }
9399description: Detects suspicious installation of bbbbbb
94100references:
95101 - https://rmusser.net/docs/ATT&CK-Stuff/ATT&CK/Discovery.html
@@ -124,9 +130,9 @@ def test_sigmarule_create_get(self):
124130 """Client Sigma object tests."""
125131
126132 rule = self .api .create_sigmarule (
127- rule_yaml = """
133+ rule_yaml = f """
128134title: Suspicious Installation of eeeee
129- id: 5266a592-b793-11ea-b3de-eeeee
135+ id: { self . RULEID2 }
130136description: Detects suspicious installation of eeeee
131137references:
132138 - https://rmusser.net/docs/ATT&CK-Stuff/ATT&CK/Discovery.html
@@ -148,17 +154,17 @@ def test_sigmarule_create_get(self):
148154 )
149155 self .assertions .assertIsNotNone (rule )
150156
151- rule = self .api .get_sigmarule (rule_uuid = "5266a592-b793-11ea-b3de-eeeee" )
152- rule .from_rule_uuid ("5266a592-b793-11ea-b3de-eeeee" )
157+ rule = self .api .get_sigmarule (rule_uuid = self . RULEID2 )
158+ rule .from_rule_uuid (self . RULEID2 )
153159 self .assertions .assertGreater (len (rule .attributes ), 5 )
154160 self .assertions .assertIsNotNone (rule )
155161 self .assertions .assertIn ("Alexander" , rule .author )
156162 self .assertions .assertIn ("Alexander" , rule .get_attribute ("author" ))
157- self .assertions .assertIn ("b793-11ea-b3de-eeeee" , rule .id )
163+ self .assertions .assertIn (self . RULEID2 , rule .id )
158164 self .assertions .assertIn ("Installation of eeeee" , rule .title )
159165 self .assertions .assertIn ("zmap" , rule .search_query )
160166 self .assertions .assertIn ("shell:zsh:history" , rule .search_query )
161- self .assertions .assertIn ("sigmarules/5266a592" , rule .resource_uri )
167+ self .assertions .assertIn (self . RULEID2 , rule .resource_uri )
162168 self .assertions .assertIn ("installation of eeeee" , rule .description )
163169 self .assertions .assertIn ("high" , rule .level )
164170 self .assertions .assertEqual (len (rule .falsepositives ), 1 )
@@ -182,14 +188,14 @@ def test_sigmarule_remove(self):
182188 """Client Sigma delete tests.
183189 The test is called remove to avoid running it before the create test.
184190 """
185- rule = self .api .get_sigmarule (rule_uuid = "5266a592-b793-11ea-b3de-eeeee" )
191+ rule = self .api .get_sigmarule (rule_uuid = self . RULEID1 )
186192 self .assertions .assertGreater (len (rule .attributes ), 5 )
187193 rule .delete ()
188194
189195 rules = self .api .list_sigmarules ()
190196 self .assertions .assertGreaterEqual (len (rules ), 1 )
191197
192- rule = self .api .get_sigmarule (rule_uuid = "5266a592-b793-11ea-b3de-bbbbbb" )
198+ rule = self .api .get_sigmarule (rule_uuid = self . RULEID2 )
193199 self .assertions .assertGreater (len (rule .attributes ), 5 )
194200 rule .delete ()
195201 rules = self .api .list_sigmarules ()
@@ -402,5 +408,131 @@ def test_modify_sketch_with_empty_name(self):
402408 sketch .description , "test_modify_sketch_with_empty_name"
403409 )
404410
411+ def test_list_timelines (self ):
412+ """Test listing timelines in a sketch."""
413+ # Create a new sketch
414+ sketch = self .api .create_sketch (
415+ name = "test_list_timelines" , description = "test_list_timelines"
416+ )
417+
418+ # Import a timeline into the sketch
419+ self .import_timeline ("evtx.plaso" , sketch = sketch )
420+
421+ # List the timelines in the sketch
422+ timelines = sketch .list_timelines ()
423+
424+ # Check that there is at least one timeline
425+ self .assertions .assertGreaterEqual (len (timelines ), 1 )
426+
427+ # Check that the timeline has a name
428+ for timeline in timelines :
429+ self .assertions .assertTrue (timeline .name )
430+
431+ # Check that the timeline has an index name
432+ for timeline in timelines :
433+ self .assertions .assertTrue (timeline .index_name )
434+
435+ # Check that the timeline has an ID
436+ for timeline in timelines :
437+ self .assertions .assertTrue (timeline .id )
438+
439+ # Import a second timeline into the sketch
440+ self .import_timeline ("evtx_part.csv" , sketch = sketch )
441+
442+ _ = sketch .lazyload_data (refresh_cache = True )
443+
444+ # List the timelines in the sketch
445+ timelines = sketch .list_timelines ()
446+
447+ # Check that there are two timelines
448+ self .assertions .assertEqual (len (timelines ), 2 )
449+
450+ def test_delete_timeline (self ):
451+ """Test deleting a timeline.
452+ This test verifies the following:
453+ - A new sketch can be created.
454+ - A timeline can be imported into the sketch.
455+ - The timeline's name, index name, and index status are correct.
456+ - The number of events in the sketch is correct
457+ after importing the timeline.
458+ - A second timeline can be imported into the sketch.
459+ - The total number of events in the sketch is correct after
460+ importing the second timeline.
461+ - A timeline can be deleted.
462+ - The number of events in the sketch is correct after deleting
463+ a timeline.
464+ - The number of timelines in the sketch is correct after
465+ deleting a timeline.
466+ Raises:
467+ AssertionError: If any of the assertions fail.
468+ RuntimeError: If the event creation fails.
469+ RuntimeError: If the sketch is not found.
470+ """
471+
472+ # create a new sketch
473+ rand = random .randint (0 , 10000 )
474+ sketch = self .api .create_sketch (
475+ name = f"test_delete_timeline { rand } " , description = "test_delete_timeline"
476+ )
477+ self .sketch = sketch
478+
479+ file_path = (
480+ "/usr/local/src/timesketch/end_to_end_tests/test_data/sigma_events.jsonl"
481+ )
482+
483+ self .import_timeline (file_path , index_name = rand , sketch = sketch )
484+ timeline = sketch .list_timelines ()[0 ]
485+ # check that timeline was uploaded correctly
486+ self .assertions .assertEqual (timeline .name , file_path )
487+ self .assertions .assertEqual (timeline .index .name , str (rand ))
488+ self .assertions .assertEqual (timeline .index .status , "ready" )
489+ self .assertions .assertEqual (len (sketch .list_timelines ()), 1 )
490+
491+ events = sketch .explore ("*" , as_pandas = True )
492+ self .assertions .assertEqual (len (events ), 4 )
493+
494+ # second import
495+
496+ file_path = "/tmp/second.csv"
497+
498+ with open (file_path , "w" , encoding = "utf-8" ) as file_object :
499+ file_object .write (
500+ '"message","timestamp","datetime","timestamp_desc","data_type"\n '
501+ )
502+
503+ for i in range (5 ):
504+ # write a line with random values for message
505+ string = (
506+ f'"CSV Count: { i } { rand } ","123456789",'
507+ '"2015-07-24T19:01:01+00:00","Write time","foobarcsv"\n '
508+ )
509+ file_object .write (string )
510+
511+ self .import_timeline ("/tmp/second.csv" , index_name = "second" , sketch = sketch )
512+ os .remove (file_path )
513+ # refresh data after import
514+ _ = sketch .lazyload_data (refresh_cache = True )
515+
516+ timeline = sketch .list_timelines ()[0 ]
517+ self .assertions .assertEqual (len (sketch .list_timelines ()), 2 )
518+
519+ # Check that there are 9 (5+4) events in total
520+ search_client = search .Search (sketch )
521+ search_response = json .loads (search_client .json )
522+ self .assertions .assertEqual (len (search_response ["objects" ]), 9 )
523+
524+ events = sketch .explore ("*" , as_pandas = True )
525+ self .assertions .assertEqual (len (events ), 9 )
526+
527+ # delete timeline 1
528+ # now it should be 5 events in one timeline
529+ timeline .delete ()
530+ events = sketch .explore ("*" , as_pandas = True )
531+ self .assertions .assertEqual (len (events ), 5 )
532+
533+ # check number of timelines
534+ _ = sketch .lazyload_data (refresh_cache = True )
535+ self .assertions .assertEqual (len (sketch .list_timelines ()), 1 )
536+
405537
406538manager .EndToEndTestManager .register_test (ClientTest )
0 commit comments