import argparse import pathlib import time import market_trade.constants import tinkoff_grpc.src.instruments import tinkoff_grpc.src.marketdata import tinkoff_grpc.src.channel import grpc # initializing argument parser argument_parser = argparse.ArgumentParser(prog="currencies saver") # this argument is used to save the currency trades into distinct dir argument_parser.add_argument("-o", "--output-directory", type=pathlib.Path, dest="output_directory", action="store") def get_all_currencies(channel: tinkoff_grpc.src.channel.Channel): """ This function takes channel and gets all currencies objects in channel :param channel: channel.Channel object :return: array of Currency object """ # initializing service instruments_service = tinkoff_grpc.src.instruments.InstrumentsService(channel) # getting currencies list currencies = instruments_service.get_currencies( instrument_status_name=market_trade.constants.DEFAULT_INSTRUMENT_STATUS ) return currencies if __name__ == '__main__': # getting the paths loaded cli_args = argument_parser.parse_args() if cli_args.output_directory: output_data_path = cli_args.output_directory else: raise NotImplementedError("We would like to have a path where we would save data") api_address = market_trade.constants.TINKOFF_API_ADDRESS token = market_trade.constants.TINKOFF_BEARER_TOKEN authorization_field = market_trade.constants.TINKOFF_AUTHORIZATION_HEADER with tinkoff_grpc.Channel(api_address=api_address, token=token, authorization_field=authorization_field) as tinkoff_channel: currencies = get_all_currencies(tinkoff_channel) while True: time.sleep(1) try: tinkoff_trades_saver = tinkoff_grpc.savers.TradesSaver(channel=tinkoff_channel, instruments=currencies, filepath=output_data_path) tinkoff_trades_saver.start() except grpc.RpcError as grpc_error: pass