`import time
import random
import threading
from multiprocessing import Process, Manager
from UltraDict import UltraDict
class A():
def init(self, name, dictname):
self.name = name
self.dictname = dictname
self.personnum = -1
self.time = time.time()
self.task = threading.Thread(target=self.a)
def a(self):
while True:
self.personnum = random.random()
self.time = time.time()
other = UltraDict(name=self.dictname)
other[self.name] = {"time": self.time, "num": self.personnum}
time.sleep(1)
def start(self):
self.task.start()
class ProcessClass():
def init(self, i, name):
self.dict1 = {}
self.dictname = name
self._process = Process(target=self.run, args=(i,))
def run(self, i):
for j in range(5):
# 每个 A 对象与共享字典关联
a = A(str(i) + str(j), self.dictname)
a.start()
self.dict1[str(i) + str(j)] = a
while True:
time.sleep(1) # 保持进程运行
def start(self):
self._process.start()
if name == 'main':
ultra = UltraDict(name='getpic', shared_lock=True)
name = ultra.name
processes = []
for i in range(5):
process_instance = ProcessClass(i, name)
process_instance.start()
processes.append(process_instance)
while True:
time.sleep(2) # 每隔2秒打印一次
print(ultra)
# print("实时获取的数据:", dict(shared_dict))`
UltraDict.Exceptions.CannotAttachSharedMemory: Could not get memory 'wnsm_cfd5be17'