-
Notifications
You must be signed in to change notification settings - Fork 28
improved speed & fixed some issues #20
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,18 +2,65 @@ | |
| import random | ||
| import time | ||
| import datetime | ||
| import math | ||
| import sys | ||
|
|
||
| import numpy | ||
| import cv2 | ||
| import keyboard | ||
| import pyautogui | ||
| import shutil | ||
| from PIL import ImageGrab | ||
| from MTM import matchTemplates | ||
|
|
||
| from threading import Thread | ||
| from pprint import pprint | ||
|
|
||
| GAME_NUM = 0 | ||
| START_TIME = datetime.datetime.now() | ||
|
|
||
|
|
||
| def notInList(results, thresholdDist, newObject): | ||
| for result in results: | ||
| if isinstance(result[1], tuple): | ||
| tupleObject = result[1] | ||
| if math.hypot(newObject[0] - tupleObject[0], newObject[1] - tupleObject[1]) < thresholdDist: | ||
| return False | ||
| else: | ||
| if math.hypot(newObject[0] - result[0], newObject[1] - result[1]) < thresholdDist: | ||
| return False | ||
|
|
||
| return True | ||
|
|
||
|
|
||
| def isInList(count, coin_item): | ||
| for counter in count: | ||
| if counter[0] == coin_item: | ||
| return True | ||
| return False | ||
|
|
||
|
|
||
| def countItemValue(count): | ||
| valuecount = 0 | ||
| for counter in count: | ||
| valuecount += counter[1] | ||
|
|
||
| return valuecount | ||
|
|
||
|
|
||
| def matchTemplate(screen, template, templateName): | ||
| matches = [] | ||
| thresholdDist = 30 | ||
|
|
||
| match = cv2.matchTemplate(screen, template, cv2.TM_CCOEFF_NORMED) | ||
| locations = numpy.where(match >= .7) | ||
| for location in zip(*locations[::-1]): | ||
| if len(matches) == 0 or notInList(matches, thresholdDist, location): | ||
| matches.append((templateName, location)) | ||
|
|
||
| return matches | ||
|
|
||
|
|
||
| def mouse_click(x, y, wait=0.1): | ||
| pyautogui.click(x, y) | ||
| time.sleep(wait) | ||
|
|
@@ -41,9 +88,15 @@ def find_image(image_path, root_image_path): | |
| return box[0], box[1] | ||
|
|
||
|
|
||
| def check_image(img): | ||
| b, _ = find_image(img, screen_grab()) | ||
| return True if b is not None else False | ||
| def check_image(img, thread=False, self=None): | ||
| if thread: | ||
| while self.game_status == "running": | ||
| b, _ = find_image(img, screen_grab()) | ||
| if not b is None: | ||
| self.game_status = "ended" | ||
| else: | ||
| b, _ = find_image(img, screen_grab()) | ||
| return True if b is not None else False | ||
|
|
||
|
|
||
| def click_image(img): | ||
|
|
@@ -64,15 +117,16 @@ def setup(): | |
| print("Program was not correctly closed last time. Make sure to exit the game with CTRL+C") | ||
|
|
||
|
|
||
| def start_game(start_img_path): | ||
| def start_game(self, start_img_path): | ||
| self.game_status = "starting" | ||
|
|
||
| click_image(start_img_path) | ||
| time.sleep(2) | ||
| if not check_image("rc_items/start_game.png"): | ||
| pyautogui.moveTo(100, 100) | ||
| return True | ||
| sx, sy = find_image("rc_items/start_game.png", screen_grab()) | ||
| mouse_click(sx + 2, sy + 2, wait=0.1) | ||
| time.sleep(3) | ||
| return False | ||
|
|
||
|
|
||
|
|
@@ -82,47 +136,92 @@ def start_game_msg(name): | |
| GAME_NUM += 1 | ||
|
|
||
|
|
||
| def end_game(): | ||
| while not check_image("rc_items/gain_power.png"): | ||
| time.sleep(1) | ||
| click_image("rc_items/gain_power.png") | ||
| def end_game(self, fail=False): | ||
| if not fail: | ||
| self.game_status = "idle" | ||
|
|
||
| keyboard.press_and_release("page up") | ||
| keyboard.press_and_release("down") | ||
|
|
||
| while not check_image("rc_items/gain_power.png"): | ||
| time.sleep(1) | ||
| click_image("rc_items/gain_power.png") | ||
|
|
||
| time.sleep(3) | ||
|
|
||
| keyboard.press_and_release("page up") | ||
| time.sleep(2) | ||
| click_image("rc_items/goto_games.png") | ||
| time.sleep(2) | ||
|
|
||
| keyboard.press_and_release("page up") | ||
| time.sleep(0.5) | ||
| click_image("rc_items/goto_games.png") | ||
| time.sleep(0.5) | ||
| if check_image("rc_items/collect_pc.png"): | ||
| click_image("rc_items/click_image") | ||
| if check_image("rc_items/collect_pc.png"): | ||
| click_image("rc_items/click_image") | ||
| else: | ||
| keyboard.press_and_release("page up") | ||
| time.sleep(2) | ||
|
|
||
| click_image("rc_items/goto_games.png") | ||
|
|
||
| os.execv(sys.executable, ['python'] + sys.argv) #restart script | ||
|
|
||
|
|
||
| class ThreadWithReturnValue(Thread): | ||
| def __init__(self, group=None, target=None, name=None, | ||
| args=(), kwargs={}, Verbose=None): | ||
| Thread.__init__(self, group, target, name, args, kwargs) | ||
| self._return = None | ||
|
|
||
| def run(self): | ||
| if self._target is not None: | ||
| self._return = self._target(*self._args, | ||
| **self._kwargs) | ||
|
|
||
| def join(self, *args): | ||
| Thread.join(self, *args) | ||
| return self._return | ||
|
Comment on lines
+168
to
+181
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. method for thread with return value (return value will be the matches of CoinFlip) |
||
|
|
||
|
|
||
| class Bot2048: | ||
| def __init__(self): | ||
| self.start_img_path = "rc_items/2048_gameimg.png" | ||
| self.available_moves = ["right", "left", "up", "down"] | ||
| self.game = "2048" | ||
| self.game_status = "idle" | ||
|
|
||
| def can_start(self): | ||
| return check_image(self.start_img_path) | ||
|
|
||
| def play(self): | ||
| err = start_game(self.start_img_path) | ||
| err = start_game(self, self.start_img_path) | ||
| if err: | ||
| return not err | ||
| start_game_msg(self.game) | ||
| self.run_game() | ||
| end_game() | ||
| end_game(self) | ||
|
|
||
| def run_game(self): | ||
| while not check_image("rc_items/gain_power.png"): | ||
| for _ in range(4): | ||
| self.game_status = "running" | ||
|
|
||
| try: | ||
| thread = ThreadWithReturnValue(target=check_image, | ||
| args=("rc_items/gain_power.png", True, self,)) | ||
| thread.start() | ||
| except: | ||
| print("Unable to start thread for checking image") | ||
| end_game(self, fail=True) | ||
|
Comment on lines
+205
to
+211
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. creates another thread to check if "gain_power" button is visible |
||
|
|
||
| while self.game_status == "running": | ||
| for i in range(8): | ||
| keyboard.press_and_release(random.choice(self.available_moves)) | ||
| time.sleep(0.125) | ||
| time.sleep(0.15) | ||
| keyboard.press_and_release("page up") # to prevent errors for the thread with check image | ||
|
Comment on lines
+213
to
+217
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. checks if game status is still "running" (is set to "ended" if thread found the "gain_power" button) |
||
|
|
||
|
|
||
| class BotCoinFlip: | ||
| def __init__(self): | ||
| self.start_img_path = "rc_items/coinflip_gameimg.png" | ||
| self.game = "CoinFlip" | ||
| self.game_status = "idle" | ||
| self.coin_pos = [] | ||
| self.coin_items = { | ||
| "binance": [], | ||
|
|
@@ -148,76 +247,112 @@ def __init__(self): | |
| ("xml", cv2.imread("rc_items/coinflip_item_xml.png")), | ||
| ("tether", cv2.imread("rc_items/coinflip_item_tether.png")), | ||
| ] | ||
| self.card_image = [("card", cv2.imread("rc_items/coinflip_back.png"))] | ||
|
|
||
| def can_start(self): | ||
| return check_image(self.start_img_path) | ||
|
|
||
| def play(self): | ||
| err = start_game(self.start_img_path) | ||
| err = start_game(self, self.start_img_path) | ||
| if err: | ||
| return False | ||
| start_game_msg(self.game) | ||
|
|
||
| pyautogui.moveTo(100, 100) | ||
| keyboard.press_and_release("down") | ||
| time.sleep(3) | ||
|
|
||
| self.get_coin_fields() | ||
| self.check_coins() | ||
| self.match_coins() | ||
| end_game() | ||
| end_game(self) | ||
| return True | ||
|
|
||
| def get_coin_fields(self): | ||
| self.game_status = "running" | ||
|
|
||
| screen = cv2.imread(screen_grab()) | ||
| matches = matchTemplates( | ||
| [("card", cv2.imread("rc_items/coinflip_back.png"))], | ||
| screen, | ||
| N_object=float("inf"), | ||
| score_threshold=0.5, | ||
| #maxOverlap=0.25, | ||
| searchBox=None) | ||
| for i in range(len(matches['BBox'])): | ||
| self.coin_pos.append(matches['BBox'][i]) | ||
| matches = cv2.matchTemplate(screen, cv2.imread("rc_items/coinflip_back.png"), cv2.TM_CCOEFF_NORMED) | ||
|
|
||
| locations = numpy.where(matches >= .7) | ||
|
|
||
| append = self.coin_pos.append | ||
| thresholdDist = 30 | ||
|
|
||
| for pt in zip(*locations[::-1]): | ||
| if len(self.coin_pos) == 0 or notInList(self.coin_pos, thresholdDist, pt): | ||
| append(pt) | ||
|
|
||
| def check_coins(self): | ||
| ind = 0 | ||
| max_index = len(self.coin_pos) | ||
| while ind < max_index: | ||
| coin1_pos = self.coin_pos[ind] | ||
| coin2_pos = self.coin_pos[ind+1] | ||
| coin2_pos = self.coin_pos[ind + 1] | ||
|
|
||
| mouse_click(coin1_pos[0], coin1_pos[1], wait=0.1) | ||
| mouse_click(coin2_pos[0], coin2_pos[1], wait=0.3) | ||
|
|
||
| mouse_click(coin1_pos[0] + coin1_pos[2]/2, coin1_pos[1] + coin1_pos[3]/2, wait=0.1) | ||
| mouse_click(coin2_pos[0] + coin2_pos[2]/2, coin2_pos[1] + coin2_pos[3]/2, wait=0.3) | ||
| pyautogui.moveTo(100, 100) | ||
| screen = cv2.imread(screen_grab()) | ||
| matches = matchTemplates( | ||
| self.coin_images, | ||
| screen, | ||
| N_object=2, | ||
| score_threshold=.7, | ||
| maxOverlap=.25, | ||
| searchBox=None) | ||
|
|
||
| coin1 = (matches["TemplateName"][0], matches["BBox"][0]) | ||
| coin2 = (matches["TemplateName"][1], matches["BBox"][1]) | ||
|
|
||
| if coin1[0] == coin2[0]: | ||
| self.coin_items.pop(coin1[0]) | ||
|
|
||
| matches = [] | ||
| threads = [] | ||
| i = 1 | ||
|
|
||
| for template in self.coin_images: | ||
| try: | ||
| thread = ThreadWithReturnValue(target=matchTemplate, | ||
| args=(screen, template[1], template[0],)) | ||
| thread.start() | ||
| threads.append(thread) | ||
| # print("starting thread " + str(i) + " for matching " + template[0]) | ||
| i += 1 | ||
| except: | ||
| print("Couldn't start thread " + str(i) + " for matching " + template[0]) | ||
| end_game(self, fail=True) | ||
|
Comment on lines
+303
to
+313
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. creates one thread per template to improve the speed of matching |
||
|
|
||
| for thread in threads: | ||
| result = thread.join() | ||
| if len(result) > 0: | ||
| matches.append(result) | ||
|
Comment on lines
+315
to
+318
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. append each found match to a list |
||
|
|
||
| if len(matches) == 2: | ||
| coin1 = (matches[0][0][0], matches[0][0][1]) | ||
| coin2 = (matches[1][0][0], matches[1][0][1]) | ||
|
|
||
| if coin1[0] == coin2[0]: | ||
| self.coin_items.pop(coin1[0]) | ||
| else: | ||
| self.coin_items[coin1[0]].append(coin1[1]) | ||
| self.coin_items[coin2[0]].append(coin2[1]) | ||
| else: | ||
| self.coin_items[coin1[0]].append(coin1[1]) | ||
| self.coin_items[coin2[0]].append(coin2[1]) | ||
| if len(matches) == 1: | ||
| coin = (matches[0][0][0], matches[0][0][1]) | ||
| self.coin_items.pop(coin[0]) | ||
| else: | ||
| end_game(self, fail=True) | ||
|
|
||
| ind += 2 | ||
|
|
||
| def match_coins(self): | ||
| for coin in self.coin_items.values(): | ||
| if len(coin) == 2: | ||
| c1 = coin[0] | ||
| mouse_click(c1[0] + c1[2] / 2, c1[1] + c1[3] / 2, wait=0.05) | ||
| mouse_click(c1[0], c1[1], wait=0.05) | ||
|
|
||
| c2 = coin[1] | ||
| mouse_click(c2[0] + c2[2] / 2, c2[1] + c2[3] / 2, wait=0.05) | ||
| mouse_click(c2[0], c2[1], wait=0.05) | ||
| time.sleep(1) | ||
|
|
||
| keyboard.press_and_release("esc") | ||
|
|
||
|
|
||
| def main(): | ||
| Bots = [ | ||
| Bot2048, | ||
| BotCoinFlip] | ||
| BotCoinFlip | ||
| ] | ||
| global GAME_NUM | ||
| while True: | ||
| for bot in Bots: | ||
|
|
@@ -226,6 +361,9 @@ def main(): | |
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| if os.path.exists("imgs"): | ||
| shutil.rmtree('imgs') | ||
|
Comment on lines
+364
to
+365
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prevent "Program was not correctly closed last time" |
||
|
|
||
| setup() | ||
| try: | ||
| main() | ||
|
|
@@ -235,7 +373,7 @@ def main(): | |
|
|
||
| finally: | ||
| print("\nStatistics:\n", | ||
| "Time running: {!s}\n".format(datetime.datetime.now()-START_TIME), | ||
| "Played Games: {!s}\n".format(GAME_NUM+1) | ||
| "Time running: {!s}\n".format(datetime.datetime.now() - START_TIME), | ||
| "Played Games: {!s}\n".format(GAME_NUM) | ||
| ) | ||
| shutil.rmtree('imgs') | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calculate distance between matches to prevent multiple matches on same coin