#!/usr/bin/env python # coding: utf-8 # In[1]: import pandas as pd import datetime import numpy as np from signals import * #потом удалить # In[2]: class voter_v2(): def __init__(self,name): self.name=name pass def createPredictMatrixBySignals(self,signalsName): pass # In[ ]: # In[3]: reqSig={ 'BB1':{ 'className':signal_BB, 'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5}, 'signalParams':{'source':'close','target':'close'}, 'batchSize':15 }, 'BB2':{ 'className':signal_BB, 'indParams':{'MeanType':'SMA','window':15,'valueType':'close','kDev':2.5}, 'signalParams':{'source':'close','target':'close'}, 'batchSize':20 } } # In[4]: reqCreate=reqSig.keys() reqCreate # In[5]: class Voter(): def __init__ (self, name=''): self.name=name self.mop={ 'up': pd.DataFrame(), 'down':pd.DataFrame(), 'none':pd.DataFrame() } self.value={} self.decision='' self.real_decision='' self.keys=[] self.slice_dict={} def addValue(self, dic_value): self.value=dic_value self.checkForNew() self.setSlice() self.getDecision() def checkForNew(self): if not (list(self.value.keys()) == self.keys): self.createNewMop(list(self.value.keys())) def createNewMop(self,missing_indicators): print('reassembly mop') new_columns= (missing_indicators) #new_columns=new_columns.append(['value','p']) n=len(new_columns) start_value=-1 variator=3 new_lst=[] buf_lst=[] for i in range(n): buf_lst.append(start_value) for i in range(pow(variator,n)): new_lst.append(buf_lst.copy()) for j in range(n): for i in range(len(new_lst)): dob_iterator=(i // pow(variator,j)) % variator new_lst[i][j]=new_lst[i][j] + dob_iterator #print (new_columns) self.keys=new_columns new_columns = new_columns+['amount']+['percentage'] for i in new_lst: i = i.extend([0,0]) #i = i.append(0) #print(new_lst) #print(new_columns) new_df=pd.DataFrame(new_lst,columns=new_columns) self.mop['up']=pd.DataFrame.from_dict(new_df.to_dict()) self.mop['down']=pd.DataFrame.from_dict(new_df.to_dict()) self.mop['none']=pd.DataFrame.from_dict(new_df.to_dict()) def setSlice(self): row_flg=True self.slice_dict={} for j in self.mop.keys(): for index, row in self.mop[j].iterrows(): for key, value in self.value.items(): if value != row[key]: #print('fasle ',key,value,row[key]) row_flg=False break if row_flg: self.slice_dict[j]=dict(row) #print(j,dict(row)) row_flg=True def getDecision (self): max_value=0 for key, value in self.slice_dict.items(): if value['amount'] >= max_value: max_value = value['amount'] self.decision = key return self.decision def setDecision (self,real_decision): self.real_decision=real_decision self.updMop() self.slice_dict[real_decision]['amount']+=1 def updMop(self): row_flg=True for index, row in self.mop[self.real_decision].iterrows(): for key, value in self.value.items(): if value != row[key]: row_flg=False break if row_flg: #self.slice_dict[j]=dict(row) row['amount']=row['amount']+1 row_flg=True # In[6]: test_dic_value_1={'lupa':1 } test_dic_value_2={'lupa':1 , 'pupa':1} test_dic_value_3={'lupa':1 , 'pupa':1 , 'zalupa':1 , 'zapupa':1 } test_dic_value_4={'lupa':1 , 'pupa':1 , 'zalupa':1 , 'zapupa':-1 } # In[7]: test=Voter('huita') test.addValue(test_dic_value_2) test.decision test.getDecision() # In[8]: test.setDecision('down') test.getDecision() # In[9]: test.slice_dict # In[ ]: # In[10]: import pickle # In[11]: dictionary_data = {"a": 1, "b": 2} a_file = open("data.pkl", "wb") pickle.dump(dictionary_data, a_file) a_file.close() a_file = open("data.pkl", "rb") output = pickle.load(a_file) print(output) a_file.close() # In[ ]: # In[ ]: # In[12]: arrays = [ ["bar", "bar", "baz", "baz", "foo", "foo", "qux", "qux"], ["one", "two", "one", "two", "one", "two", "one", "two"], ] tuples = list(zip(*arrays)) tuples # In[13]: index = pd.MultiIndex.from_tuples(tuples, names=["first", "second"]) # In[14]: s = pd.DataFrame(np.random.randn(8), index=index) s # In[15]: s.to_dict() # In[16]: s.loc(('bar', 'one')) # In[18]: iterables = [["up", "down", "none"], ["up", "down", "none"]] df = pd.DataFrame({'col1': np.random.randn(9),'col2': np.random.randn(9)}, index=pd.MultiIndex.from_product(iterables, names=["first", "second"])) df # In[19]: df.__dict__ # In[ ]: # In[ ]: # In[20]: def createDF(namesIndex, namesColoms): trandeValuesList = ['up','none','down'] colomsName_lvl = ['trande','amaunt','probability'] #micolumns = pd.MultiIndex.from_tuples( #[('amaunt', 'up'), ('amaunt', 'none'), ('amaunt', 'down'), ('trande',),('probability',)], names=["lvl0", "lvl1"] #) df = pd.DataFrame({ 'trande': [None]*pow(3,len(namesIndex)), 'amaunt': [None]*pow(3,len(namesIndex)), 'probability': [None]*pow(3,len(namesIndex)) }, index=pd.MultiIndex.from_product([trandeValuesList]*len(namesIndex), names=namesIndex) ,columns=namesColoms ) return(df) # In[21]: dd=createDF( ['1','2','3'],['trande','amaunt','probability'] ) dd # In[22]: df.xs(('up','down'), level=['first','second']) # In[23]: dd['trande'] # In[24]: tvl = ['up','none','down'] colomsName_lvl = ['trande','amaunt','probability'] # In[25]: tuplesCol = list(zip(['amaunt']*3,tvl)) tuplesCol # In[26]: df.loc['up','down'] # In[27]: df.xs(('up','down'), level=['first','second']).iloc[0] # In[28]: df_d=df.to_dict('tight') df_d # In[29]: df_d['index_names'] # In[30]: ddf = pd.DataFrame.from_dict(df_d, orient='tight') ddf # In[31]: tuple([1,2,3]) # In[32]: ddf.xs(('up','down'), level=['first','second']).iloc[0] # In[ ]: