forked from snstac/pytak
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend.py
More file actions
202 lines (162 loc) · 6.69 KB
/
send.py
File metadata and controls
202 lines (162 loc) · 6.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/env python3
import os
import asyncio
import xml.etree.ElementTree as ET
import uuid
from zipPackage import zipPackage
from configparser import ConfigParser
import pytak
callsigns = []
from requests_pkcs12 import post
def upload_file_create(upload_name, mime_type, config, filename=None, data=None):
print("Sending file package: " + filename)
if filename is not None:
data = open(filename, 'rb').read()
if data is None:
raise ValueError('tak_server_sdk.upload_file: No data to send')
marti_url = config.get("MARTI_URL")
for contact in callsigns:
print(f"Contact: {contact}")
response = post(
f'{marti_url}/Marti/sync/missioncreate',
files={'assetfile': ("package.zip", data, 'application/zip')},
data={'filename': "package.zip", 'contacts': contact},
pkcs12_filename=config.get("USER_CERT_P12"),
pkcs12_password=config.get("PYTAK_TLS_CLIENT_PASSWORD"),
verify=False
)
print("$$$$$$$$$$$$$$$$$$$$$$$ Send complete.")
print(response.status_code)
print(response.text)
def gen_cot(uid, lat, lon, config, attitude="x", dimension="G", uid_type="UUID"):
"""Generate CoT Event."""
root = ET.Element("event")
root.set("version", "2.0")
#root.set('type', "a-{attitude}-{dimension}".format(attitude=attitude, dimension=dimension))
root.set('type', "b-i-x-i")
root.set("uid", uid)
root.set("how", "h-g-i-g-o")
root.set("time", pytak.cot_time())
root.set("start", pytak.cot_time())
root.set(
"stale", pytak.cot_time(60)
) # time difference in seconds from 'start' when stale initiates
pt_attr = {
"lat": lat, # set your lat (this loc points to Central Park NY)
"lon": lon, # set your long (this loc points to Central Park NY)
"hae": '-32.898',
"ce": "9999999.0",
"le": "9999999.0",
}
ET.SubElement(root, "point", attrib=pt_attr)
detail = ET.SubElement(root, "detail")
contact = ET.SubElement(detail, 'contact')
contact.set('callsign', config.get("CALLSIGN"))
remarks = ET.SubElement(detail, "remarks")
remarks.text = f"Generated by pytak at {pytak.cot_time()}"
return ET.tostring(root)
class MySender(pytak.QueueWorker):
"""
Defines how you process or generate your Cursor-On-Target Events.
From there it adds the COT Events to a queue for TX to a COT_URL.
"""
async def handle_data(self, data):
"""Handle pre-CoT data, serialize to CoT Event, then puts on queue."""
event = data
print(data);
await self.put_queue(event)
async def run(self, number_of_iterations=-1):
"""Run the loop for processing or generating pre-CoT data."""
while 1:
#data = gen_cot(uid=uuid.uuid4().hex, lat="40.7812", lon="-73.9665", config=self.config)
#self._logger.info("Sending:\n%s\n", data.decode())
#await self.handle_data(data)
await asyncio.sleep(5)
class MyReceiver(pytak.QueueWorker):
"""Defines how you will handle events from RX Queue."""
async def handle_data(self, data):
"""Handle data from the receive queue."""
xml_data = data.decode();
self._logger.info("Received:\n%s\n", xml_data)
# Parse the XML
root = ET.fromstring(xml_data)
# Check if it's a contact
contact_element = root.find('.//contact')
is_contact = contact_element is not None
# Print if it's a contact and the details
if is_contact:
print("This is a contact.")
callsign = contact_element.get('callsign')
endpoint = contact_element.get('endpoint')
uid = root.get('uid')
if uid not in callsigns:
callsigns.append(uid)
setup_and_send(self.config)
print(f"Contact Callsign: {callsign}")
print(f"Contact Endpoint: {endpoint}")
print(f"Contact UID: {uid}")
else:
print("This is not a contact.")
async def run(self): # pylint: disable=arguments-differ
"""Read from the receive queue, put data onto handler."""
while 1:
data = (
await self.queue.get()
) # this is how we get the received CoT from rx_queue
await self.handle_data(data)
def setup_and_send(config):
#Create an image package:
attachment_dir = config.get("ATTACHMENTS_DIR")
new_package_uuid = str(uuid.uuid4());
print("New UUID", new_package_uuid)
# Ensure the directory exists
directory = os.path.join(attachment_dir, new_package_uuid)
print("Directory: " + directory)
os.makedirs(directory, exist_ok=True)
#This is the same as the parent.. but one deeper
cot_directory = os.path.join(directory, new_package_uuid)
os.makedirs(cot_directory, exist_ok=True)
#Next create the image uuid
image_uuid = uuid.uuid4().hex
#This is different from the parent and will hold the image
image_file_directory = os.path.join(directory, image_uuid)
os.makedirs(image_file_directory, exist_ok=True)
#Create the two files needed. The COT message and the image.
#First create the COT message uuid
cot_uuid = new_package_uuid
#Next create the COT content:
cot_content = gen_cot(uid=cot_uuid, lat="38.3417207", lon="-123.8691829", config=config)
print(f"Content made: {cot_content}")
cot_file = open(f'{cot_directory}/{cot_uuid}.cot', 'w')
cot_file.write(cot_content.decode())
cot_file.close()
#Next Copy the image content in place:
image_file = open(f'{image_file_directory}/{image_uuid}.png', 'wb')
image_file.write(open('image.png', 'rb').read())
image_file.close()
#zip_file = zipPackage(new_package_uuid, callsign=config.get("CALLSIGN"))
zip_file = zipPackage(new_package_uuid, callsign="ROCKINGIT")
print(f"Zip file created: {zip_file}")
upload_file_create(upload_name=f"{new_package_uuid}.zip", mime_type="application/zip", config=config, filename=zip_file)
async def main():
"""Main definition of your program, sets config params and
adds your serializer to the asyncio task list.
"""
print ("main")
config = ConfigParser()
config.read("./config.ini")
print (config.sections())
#print (config["mycottool"])
config = config["test"]
# Initializes worker queues and tasks.
clitool = pytak.CLITool(config)
await clitool.setup()
print("Sending...");
# Add your serializer to the asyncio task list.
clitool.add_tasks(
set([MySender(clitool.tx_queue, config),MyReceiver(clitool.rx_queue, config)])
)
# Start all tasks.
await clitool.run()
if __name__ == "__main__":
asyncio.run(main())