import os
import pandas as pd
import xml.etree.ElementTree as ET
import sys
import time
from colorama import init, Fore, Style
import hashlib
def typewriter(text, delay=0.1):
for char in text:
sys.stdout.write(char)
sys.stdout.flush()
time.sleep(delay)
print()
def full_clean(directory):
for root, dirs, files in os.walk(directory):
for file in files:
if '_Forward' in file or '_Back' in file:
os.remove(os.path.join(root, file))
def partial_clean(directory):
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith('.xml'):
os.remove(os.path.join(root, file))
def convert_xml_to_xlsx(xml_path, xlsx_path):
try:
tree = ET.parse(xml_path)
root = tree.getroot()
data = []
for row in root.iter('{urn:schemas-microsoft-com:office:spreadsheet}Row'):
row_data = [cell.find('{urn:schemas-microsoft-com:office:spreadsheet}Data').text for cell in row.findall('{urn:schemas-microsoft-com:office:spreadsheet}Cell')]
data.append(row_data)
df = pd.DataFrame(data[1:], columns=data[0])
with pd.ExcelWriter(xlsx_path, engine='openpyxl') as writer:
df.to_excel(writer, index=False)
except FileNotFoundError:
print(f"Error: {xml_path} not found.")
return False
except ET.ParseError as e:
print(f"Error parsing XML file {xml_path}: {str(e)}")
return False
except Exception as e:
print(f"Error converting {xml_path} to {xlsx_path}: {str(e)}")
return False
return True
def hash_columns(row, columns):
concatenated = ''.join(str(row[col]) for col in columns)
return hashlib.sha1(concatenated.encode('utf-8')).hexdigest()
def match_data(currency_pair, directory):
print(Fore.YELLOW + "Converting XML to XLSX..." + Style.RESET_ALL)
forward_xml_path = os.path.join(directory, f"{currency_pair}_Forward.xml")
back_xml_path = os.path.join(directory, f"{currency_pair}_Back.xml")
forward_xlsx_path = os.path.join(directory, f"{currency_pair}_Forward.xlsx")
back_xlsx_path = os.path.join(directory, f"{currency_pair}_Back.xlsx")
if not convert_xml_to_xlsx(forward_xml_path, forward_xlsx_path) or not convert_xml_to_xlsx(back_xml_path, back_xlsx_path):
return
print(Fore.YELLOW + "Reading individual files..." + Style.RESET_ALL)
forward_df = pd.read_excel(forward_xlsx_path)
back_df = pd.read_excel(back_xlsx_path)
forward_keys = ['Back Result', 'PipStep', 'PipStepExponent', 'DelayTradeSequence', 'ReverseSequenceDirection', 'LotSize', 'LotSizeExponent', 'RsiTimeframe', 'RsiPeriod', 'RsiSellLevel', 'EmaTimeframe', 'EmaPeriods', 'AdxTimeframe', 'AdxPeriod', 'AdxThreshold', 'NewsAction']
back_keys = ['Result', 'PipStep', 'PipStepExponent', 'DelayTradeSequence', 'ReverseSequenceDirection', 'LotSize', 'LotSizeExponent', 'RsiTimeframe', 'RsiPeriod', 'RsiSellLevel', 'EmaTimeframe', 'EmaPeriods', 'AdxTimeframe', 'AdxPeriod', 'AdxThreshold', 'NewsAction']
print(Fore.YELLOW + "Matching data using specified columns..." + Style.RESET_ALL)
forward_df['hash'] = forward_df.apply(lambda row: hash_columns(row, forward_keys), axis=1)
back_df['hash'] = back_df.apply(lambda row: hash_columns(row, back_keys), axis=1)
matched_df = forward_df.merge(back_df, on='hash', suffixes=('_forward', '_back'), how='inner').drop(columns=['hash'])
print(Fore.YELLOW + "Generating key for matched data..." + Style.RESET_ALL)
matched_df['key'] = matched_df.index + 1
matched_df['key'] = matched_df['key'].apply(lambda x: f"{x:07d}")
print(Fore.YELLOW + "Saving matched data to Excel file..." + Style.RESET_ALL)
save_to_excel(matched_df, currency_pair, directory)
def save_to_excel(df, currency_pair, directory):
output_path = os.path.join(directory, f"{currency_pair}_Matched.xlsx")
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
df[['key'] + [col for col in df.columns if col.endswith('_forward') or col == 'Forward Result' or col == 'Back Result']].to_excel(writer, index=False, sheet_name='Forward')
df[['key'] + [col for col in df.columns if col.endswith('_back') or col == 'Result']].to_excel(writer, index=False, sheet_name='Back')
print(Fore.GREEN + f"Matched data saved to {output_path}" + Style.RESET_ALL)
def main():
while True:
init()
currency_pair = input(Fore.CYAN + "Please enter the currency pair (e.g. EURNZD): " + Style.RESET_ALL).upper()
user_home = os.path.expanduser("~")
downloads_dir = os.path.join(user_home, "Downloads")
directory = os.path.join(downloads_dir, f"Automated MT5 Results\\{currency_pair}")
if not os.path.exists(directory):
print(f"Error: Directory '{directory}' does not exist.")
else:
match_data(currency_pair, directory)
user_choice = input(
Fore.CYAN + "Do you want to match another set of files? (Y/N): " + Style.RESET_ALL).upper()
if user_choice != 'Y':
cleanup_choice = input(
Fore.CYAN + "Do you want to clean up the folders? (Y/N): " + Style.RESET_ALL).upper()
if cleanup_choice == 'Y':
clean_type = input(
Fore.CYAN + "Select the type of clean: Full (F) or Partial (P): " + Style.RESET_ALL).upper()
if clean_type == 'F':
full_clean(directory)
print(Fore.GREEN + "Full clean performed successfully." + Style.RESET_ALL)
elif clean_type == 'P':
partial_clean(directory)
print(Fore.GREEN + "Partial clean performed successfully." + Style.RESET_ALL)
else:
print(Fore.RED + "Invalid clean type selected. No cleanup performed." + Style.RESET_ALL)
typewriter(
Fore.GREEN + Style.BRIGHT + "All matching has been completed successfully." + Style.RESET_ALL)
break
if __name__ == "__main__":
main()
Bookmarks