Filter
Exclude
Time range
-
Near
import json from pathlib import Path import pyotp import logging import requests from firstrade import urls from firstrade.exceptions import ( AccountResponseError, LoginError, LoginRequestError, LoginResponseError, ) logger = logging.getLogger(__name__) class FTSession: """Class creating a session for Firstrade. This class handles the creation and management of a session for logging into the Firstrade platform. It supports multi-factor authentication (MFA) and can save session cookies for persistent logins. Attributes: username (str): Firstrade login username. password (str): Firstrade login password. pin (str, optional): Firstrade login pin. email (str, optional): Firstrade MFA email. phone (str, optional): Firstrade MFA phone number. mfa_secret (str, optional): Secret key for generating MFA codes. profile_path (str, optional): The path where the user wants to save the cookie pkl file. debug (bool, optional): Log HTTP requests/responses if true. DO NOT POST YOUR LOGS ONLINE. t_token (str, optional): Token used for MFA. otp_options (dict, optional): Options for OTP (One-Time Password) if MFA is enabled. login_json (dict, optional): JSON response from the login request. session (requests.Session): The requests session object used for making HTTP requests. Methods: __init__(username, password, pin=None, email=None, phone=None, mfa_secret=None, profile_path=None, debug=False): Initializes a new instance of the FTSession class. login(): Validates and logs into the Firstrade platform. login_two(code): Finishes the login process to the Firstrade platform. When using email or phone mfa. delete_cookies(): Deletes the session cookies. _load_cookies(): Checks if session cookies were saved and loads them. _save_cookies(): Saves session cookies to a file. _mask_email(email): Masks the email for use in the API. _handle_mfa(): Handles multi-factor authentication. _request(method, url, **kwargs): HTTP requests wrapper to the API. """ def __init__( self, username: str = "", password: str = "", pin: str = "", email: str = "", phone: str = "", mfa_secret: str = "", profile_path: str | None = None, *, save_session: bool = False, debug: bool = False ) -> None: """Initialize a new instance of the FTSession class. Args: username (str): Firstrade login username. password (str): Firstrade login password. pin (str, optional): Firstrade login pin. email (str, optional): Firstrade MFA email. phone (str, optional): Firstrade MFA phone number. mfa_secret (str, optional): Firstrade MFA secret key to generate TOTP. profile_path (str, optional): The path where the user wants to save the cookie json file. save_session (bool, optional): Save session cookies if true. debug (bool, optional): Log HTTP requests/responses if true. DO NOT POST YOUR LOGS ONLINE. """ self.username: str = username self.password: str = password self.pin: str = pin self.email: str = FTSession._mask_email(email) if email else "" self.phone: str = phone self.mfa_secret: str = mfa_secret self.profile_path: str | None = profile_path self.save_session: bool = save_session # Flag to save session cookies self.debug: bool = debug if self.debug: logging.basicConfig(level=logging.DEBUG) # Enable HTTP connection debug output import http.client as http_client http_client.HTTPConnection.debuglevel = 1 # requests logging too logging.getLogger("requests.packages.urllib3").setLevel(logging.DEBUG) logging.getLogger("requests.packages.urllib3").propagate = True self.t_token: str | None = None self.otp_options: str | list[dict[str, str]] | None = None self.login_json: dict[str, str] = {} self.session = requests.Session() def login(self) -> bool: """Validate and log into the Firstrade platform. This method sets up the session headers, loads cookies if available, and performs the login request. It handles multi-factor authentication (MFA) if required. Raises: LoginRequestError: If the login request fails with a non-200 status code. LoginResponseError: If the login response contains an error message. """ self.session.headers.update(urls.session_headers()) ftat: str = self._load_cookies() if ftat: self.session.headers["ftat"] = ftat response: requests.Response = self._request("get", url="api3x.firstrade.com/", timeout=10) # type: ignore[arg-type] self.session.headers["access-token"] = urls.access_token() data: dict[str, str] = { "username": r"" self.username, "password": r"" self.password, } response: requests.Response = self._request( method="post", url=urls.login(), data=data, ) try: self.login_json: dict[str, str] = response.json() except json.decoder.JSONDecodeError as exc: error_msg = "Invalid JSON is your account funded?" raise LoginResponseError(error_msg) from exc if "mfa" not in self.login_json and "ftat" in self.login_json and not self.login_json["error"]: self.session.headers["sid"] = self.login_json["sid"] return False self.t_token: str | None = self.login_json.get("t_token") if not self.login_json.get("mfa"): self.otp_options = self.login_json.get("otp") if response.status_code != 200: raise LoginRequestError(response.status_code) if self.login_json["error"]: raise LoginResponseError(self.login_json["error"]) need_code: bool | None = self._handle_mfa() if self.login_json["error"]: raise LoginResponseError(self.login_json["error"]) if need_code: return True self.session.headers["ftat"] = self.login_json["ftat"] self.session.headers["sid"] = self.login_json["sid"] if self.save_session: self._save_cookies() return False def login_two(self, code: str) -> None: """Finish login to the Firstrade platform.""" data: dict[str, str | None] = {} if self.login_json.get("mfa"): data.update({ "mfaCode": code, "remember_for": "30", "t_token": self.t_token, }) else: data: dict[str, str | None] = { "otpCode": code, "verificationSid": self.session.headers["sid"], "remember_for": "30", "t_token": self.t_token, } response: requests.Response = self._request(method="post", url=urls.verify_pin(), data=data) self.login_json: dict[str, str] = response.json() if self.login_json["error"]: raise LoginResponseError(self.login_json["error"]) self.session.headers["ftat"] = self.login_json["ftat"] self.session.headers["sid"] = self.login_json["sid"] if self.save_session: self._save_cookies() def delete_cookies(self) -> None: """Delete the session cookies.""" path: Path = Path(self.profile_path) / f"ft_cookies{self.username}.json" if self.profile_path is not None else Path(f"ft_cookies{self.username}.json") path.unlink() def get_tokens(self) -> dict[str, str | bytes | dict[str, str] | None]: """Return the current session tokens (access_token, ftat, sid and cookies).""" cookies: dict[str, str] = self.session.cookies.get_dict() return { "access-token": self.session.headers.get("access-token"), "ftat": self.session.headers.get("ftat"), "sid": self.session.headers.get("sid"), "cookies": cookies or "", } def build_session_from_tokens(self, tokens: dict[str, str | bytes | dict[str, str] | None]) -> None: """Build the session headers and cookies from provided tokens.""" self.session.headers.update(urls.session_headers()) if tokens: access_token = tokens.get("access-token") ftat_token = tokens.get("ftat") sid_token = tokens.get("sid") if isinstance(access_token, (str, bytes)): self.session.headers.update({"access-token": access_token}) if isinstance(ftat_token, (str, bytes)): self.session.headers.update({"ftat": ftat_token}) if isinstance(sid_token, (str, bytes)): self.session.headers.update({"sid": sid_token}) cookies = tokens.get("cookies") if isinstance(cookies, dict): self.session.cookies.update(cookies) # type: ignore[arg-type] def _load_cookies(self) -> str | None: """Check if session cookies were saved. Returns ------- str: The saved session token. """ ftat = "" directory: Path = Path(self.profile_path) if self.profile_path is not None else Path() if not directory.exists(): directory.mkdir(parents=True) for filepath in directory.iterdir(): if filepath.name.endswith(f"{self.username}.json"): with filepath.open(mode="r") as f: ftat: str = json.load(fp=f) return ftat def _save_cookies(self) -> str | None: """Save session cookies to a file.""" # Allow providing "ftat" token from an external source if self.save_session: if self.profile_path: directory = Path(self.profile_path) if not directory.exists(): directory.mkdir(parents=True) path: Path = directory / f"ft_cookies{self.username}.json" else: path = Path(f"ft_cookies{self.username}.json") with path.open("w") as f: ftat: str | None = self.session.headers.get("ftat") json.dump(obj=ftat, fp=f) @staticmethod def _mask_email(email: str) -> str: """Mask the email for use in the API. Args: email (str): The email address to be masked. Returns: str: The masked email address. """ local, domain = email.split(sep="@") masked_local: str = local[0] "*" * 4 domain_name, tld = domain.split(".") masked_domain: str = domain_name[0] "*" * 4 return f"{masked_local}@{masked_domain}.{tld}" def _handle_mfa(self) -> bool: """Handle multi-factor authentication. This method processes the MFA requirements based on the login response and user-provided details. """ response: requests.Response | None = None data: dict[str, str | None] = {} if self.pin: response: requests.Response = self._handle_pin_mfa(data) self.login_json = response.json() elif (self.email or self.phone) and not self.login_json.get("mfa"): response: requests.Response = self._handle_otp_mfa(data) self.login_json = response.json() elif self.mfa_secret: response: requests.Response = self._handle_secret_mfa(data) self.login_json = response.json() elif self.login_json.get("mfa"): pass # MFA handling without user provided secret in login_two else: error_msg = "MFA required but no valid MFA method was provided (pin, email/phone, or mfa_secret)." raise LoginError(error_msg) if self.login_json["error"]: raise LoginResponseError(self.login_json["error"]) if self.pin or self.mfa_secret: self.session.headers["sid"] = self.login_json["sid"] return False if self.login_json.get("mfa") and not self.mfa_secret: return True self.session.headers["sid"] = self.login_json["verificationSid"] return True def _handle_pin_mfa(self, data: dict[str, str | None]) -> requests.Response: """Handle PIN-based MFA.""" data.update({ "pin": self.pin, "remember_for": "30", "t_token": self.t_token, }) return self._request("post", urls.verify_pin(), data=data) def _handle_otp_mfa(self, data: dict[str, str | None]) -> requests.Response: """Handle email/phone OTP-based MFA.""" if not self.otp_options: error_msg = "No OTP options available." raise LoginResponseError(error_msg) for item in self.otp_options: if (item["channel"] == "sms" and self.phone and self.phone in item["recipientMask"]) or (item["channel"] == "email" and self.email and self.email == item["recipientMask"]): data.update({ "recipientId": item["recipientId"], "t_token": self.t_token, }) break return self._request("post", urls.request_code(), data=data) def _handle_secret_mfa(self, data: dict[str, str | None]) -> requests.Response: """Handle MFA secret-based authentication.""" mfa_otp = pyotp.TOTP(self.mfa_secret).now() data.update({ "mfaCode": mfa_otp, "remember_for": "30", "t_token": self.t_token, }) return self._request("post", urls.verify_pin(), data=data) def _request(self, method: str, url: str, **kwargs: object) -> requests.Response: """Send HTTP request and log the full response content if debug=True.""" resp = self.session.request(method, url, **kwargs) # type: ignore[no-untyped-call] if self.debug: # Suppress urllib3 / http.client debug so we only see this log logging.getLogger("urllib3").setLevel(logging.WARNING) # Basic request info logger.debug(f">>> {method.upper()} {url}") logger.debug(f"<<< Status: {resp.status_code}") logger.debug(f"<<< Headers: {resp.headers}") # Log raw bytes length try: logger.debug(f"<<< Raw bytes length: {len(resp.content)}") except Exception as e: logger.debug(f"<<< Could not read raw bytes: {e}") # Log pretty JSON (if any) try: import json as pyjson # This automatically uses requests decompression if gzip is set json_body = resp.json() pretty = pyjson.dumps(json_body, indent=2) logger.debug(f"<<< JSON body:\n{pretty}") except Exception as e: # If JSON decoding fails, fallback to raw text try: logger.debug(f"<<< Body (text):\n{resp.text}") except Exception as e2: logger.debug(f"<<< Could not read body text: {e2}") return resp def __getattr__(self, name: str) -> object: """Forward unknown attribute access to session object. Args: name (str): The name of the attribute to be accessed. Returns: The value of the requested attribute from the session object. """ return getattr(self.session, name) class FTAccountData: """Dataclass for storing account information.""" def __init__(self, session: requests.Session) -> None: """Initialize a new instance of the FTAccountData class. Args: session (requests.Session): The session object used for making HTTP requests. """ self.session: requests.Session = session self.all_accounts: list[dict[str, object]] = [] self.account_numbers: list[str] = [] self.account_balances: dict[str, object] = {} response: requests.Response = self.session._request("get", url=urls.user_info()) self.user_info: dict[str, object] = response.json() response: requests.Response = self.session._request("get", urls.account_list()) if response.status_code != 200 or response.json()["error"]: raise AccountResponseError(response.json()["error"]) self.all_accounts = response.json() for item in self.all_accounts["items"]: self.account_numbers.append(item["account"]) self.account_balances[item["account"]] = item["total_value"] def get_account_balances(self, account: str) -> dict[str, object]: """Get account balances for a given account. Args: account (str): Account number of the account you want to get balances for. Returns: dict: Dict of the response from the API. """ response: requests.Response = self.session._request("get", urls.account_balances(account)) return response.json() def get_positions(self, account: str) -> dict[str, object]: """Get currently held positions for a given account. Args: account (str): Account number of the account you want to get positions for. Returns: dict: Dict of the response from the API. """ response = self.session._request("get", urls.account_positions(account)) return response.json() def get_account_history( self, account: str, date_range: str = "ytd", custom_range: list[str] | None = None, ) -> dict[str, object]: """Get account history for a given account. Args: account (str): Account number of the account you want to get history for. date_range (str): The range of the history. Defaults to "ytd". Available options are ["today", "1w", "1m", "2m", "mtd", "ytd", "ly", "cust"]. custom_range (list[str] | None): The custom range of the history. Defaults to None. If range is "cust", this parameter is required. Format: ["YYYY-MM-DD", "YYYY-MM-DD"]. Returns: dict: Dict of the response from the API. """ if date_range == "cust" and custom_range is None: raise ValueError("Custom range required.") response: requests.Response = self.session._request( "get", urls.account_history(account, date_range, custom_range), ) return response.json() def get_orders(self, account: str, per_page: int = 0) -> list[dict[str, object]]: """Retrieve existing order data for a given account. Args: account (str): Account number of the account to retrieve orders for. per_page (int): Number of orders to retrieve per page. Defaults to 0 (all orders). Returns: list: A list of dictionaries, each containing details about an order. """ response = self.session._request("get", url=urls.order_list(account, per_page)) return response.json() def cancel_order(self, order_id: str) -> dict[str, object]: """Cancel an existing order. Args: order_id (str): The order ID to cancel. Returns: dict: A dictionary containing the response data. """ data = { "order_id": order_id, } response = self.session._request("post", url=urls.cancel_order(), data=data) return response.json() def get_balance_overview(self, account: str, keywords: list[str] | None = None) -> dict[str, object]: """Return a filtered, flattened view of useful balance fields. This is a convenience helper over `get_account_balances` to quickly surface likely relevant numbers such as cash, available cash, and buying power without needing to know the exact response structure. Args: account (str): Account number to query balances for. keywords (list[str], optional): Additional case-insensitive substrings to match in keys. Defaults to a sensible set for balances. Returns: dict: A dict mapping dot-notated keys to values from the balances response where the key path contains any of the keywords. """ if keywords is None: keywords = [ "cash", "avail", "withdraw", "buying", "bp", "equity", "value", "margin", ] payload: dict[str, object] = self.get_account_balances(account) filtered: dict[str, object] = {} def _walk(node: object, path: list[str]) -> None: if isinstance(node, dict): for k, v in node.items(): _walk(node=v, path=[*path, str(object=k)]) elif isinstance(node, list): for i, v in enumerate(iterable=node): _walk(node=v, path=[*path, str(object=i)]) else: key_path: str = ".".join(path) low: str = key_path.lower() if any(sub in low for sub in keywords): filtered[key_path] = node _walk(node=payload, path=[]) return filtered

4
1
3,258
書籍「PythonでつくるWebアプリのつくり方」実践中! 7章のワンタイムパスワード認証に挑戦! PyOTPライブラリの使い方が分からなくて、パスワードの有効期間を思うように実装できませんでした… 姑息な手段で認証を突破しましたw #のんびり統計 #python #Webアプリ
1
35
1,231
過去投稿 投稿日:2025/04/15 こぴぺたん ログ 【Python】2段階認証コード(TOTP)を自作する方法【pyotpライブラリ活用】 #駆け出しエンジニアと繋がりたい c-a-p-engineer.github.io/tec…

2
94
過去投稿 投稿日:2025/04/15 こぴぺたん ログ 【Python】2段階認証コード(TOTP)を自作する方法【pyotpライブラリ活用】 #駆け出しエンジニアと繋がりたい c-a-p-engineer.github.io/tec…

2
103
過去投稿 投稿日:2025/04/15 こぴぺたん ログ 【Python】2段階認証コード(TOTP)を自作する方法【pyotpライブラリ活用】 #駆け出しエンジニアと繋がりたい c-a-p-engineer.github.io/tec…

2
118
過去投稿 投稿日:2025/04/15 こぴぺたん ログ 【Python】2段階認証コード(TOTP)を自作する方法【pyotpライブラリ活用】 #駆け出しエンジニアと繋がりたい c-a-p-engineer.github.io/tec…

2
1,216
過去投稿 投稿日:2025/04/15 こぴぺたん ログ 【Python】2段階認証コード(TOTP)を自作する方法【pyotpライブラリ活用】 #駆け出しエンジニアと繋がりたい c-a-p-engineer.github.io/tec…

1
85
過去投稿 投稿日:2025/04/15 こぴぺたん ログ 【Python】2段階認証コード(TOTP)を自作する方法【pyotpライブラリ活用】 #駆け出しエンジニアと繋がりたい c-a-p-engineer.github.io/tec…

1
76
𝗛𝗼𝘄 𝗧𝘄𝗼-𝗙𝗮𝗰𝘁𝗼𝗿 𝗔𝘂𝘁𝗵𝗲𝗻𝘁𝗶𝗰𝗮𝘁𝗶𝗼𝗻 (𝟮𝗙𝗔) 𝗪𝗼𝗿𝗸𝘀? Your users' passwords are getting leaked daily. Here's how 2FA protects your applications and why every engineer should understand the flow. When users log in, your system validates their credentials against your database. Pass? Great, but you're only at step one. 2FA adds a second verification layer that requires proof of physical device access. Think of it as if (𝚙𝚊𝚜𝚜𝚠𝚘𝚛𝚍_𝚟𝚊𝚕𝚒𝚍 && 𝚜𝚎𝚌𝚘𝚗𝚍_𝚏𝚊𝚌𝚝𝚘𝚛_𝚟𝚊𝚕𝚒𝚍) instead of just if (𝚙𝚊𝚜𝚜𝚠𝚘𝚛𝚍_𝚟𝚊𝚕𝚒𝚍). 𝗛𝗼𝘄 𝘁𝗼 𝗶𝗺𝗽𝗹𝗲𝗺𝗲𝗻𝘁 𝟮𝗙𝗔? 𝟭. 𝗧𝗢𝗧𝗣 (𝗧𝗶𝗺𝗲-𝗯𝗮𝘀𝗲𝗱 𝗢𝗻𝗲-𝗧𝗶𝗺𝗲 𝗣𝗮𝘀𝘀𝘄𝗼𝗿𝗱): Apps like Google Authenticator generate codes using a shared secret and current timestamp. The server runs the same algorithm and compares results. 𝟮. 𝗦𝗠𝗦/𝗘𝗺𝗮𝗶𝗹 𝗢𝗧𝗣: Your backend generates a random code, temporarily stores it, and sends it to the user's registered contact. This method is vulnerable to SIM swapping. 𝟯. 𝗕𝗶𝗼𝗺𝗲𝘁𝗿𝗶𝗰 𝘃𝗲𝗿𝗶𝗳𝗶𝗰𝗮𝘁𝗶𝗼𝗻 uses device-level authentication (Face ID, fingerprint) that never leaves the user's device. It is the strongest option when available. Password-only authentication has one attack vector. 2FA requires compromising both the password database and the user's physical device. For attackers, that's the difference between a single SQL injection and a coordinated social engineering campaign. If you're building user authentication, start with TOTP. Libraries like speakeasy (Node.js) or pyotp (Python) handle the complexity of crypto. Your users will thank you when the next breach hits your industry. #softwareengineering #programming #security
4
82
357
30,055
A new guy on a work trial has just shipped 2FA/MFA for web agents. Your agent can directly login for you without human intervention. We made that available in the open-source version. He included a tool for the agent to get the secret using pyotp. The OTP secret must be provided via .env.
4
2
65
5,578
با نصب pyotp هم کاملا مشکل دارم :) آدم برای یه لیوان شیر یه گاو نصب نمیکنه
2
1
6
278
30 May 2024
TOTPハム太郎から乱数の種を思いついたところで思わずツイートしたが、調べたらpyotpとかpam_oathとか便利そうなものを見つけて便利だなあってなってる net.itc.nagoya-u.ac.jp/~shim… wiki.archlinux.jp/index.php/…

4
1,654
16 Jan 2024
Day 2 of #100DaysOfCode . Exploring the #PyOTP module in python, for one time password generation. I was able to create an Authorization project that can produce a #OneTimePassword and intergratable with the #Google authenticator app. Both time based and counter based generation
6
88
Automate Zerodha web including TOTP, you can fetch the cookies to get Intraday Data or your account info. I am using selenium in this example. TOTP part might be useful to others too, check pyotp part if you are interested youtu.be/fRlfcR__XHU?si=jQ5s…

1
5
810
Day 64/#100DaysOfCode Today, I created a two-step login system. I'm using the pyotp package to generate Time-based OTP. Well, I spent almost 6 hours figuring out how to build this kind of system. In the end, I was filled with happiness. I consider this an #accomplishment.
1
9
148
Replying to @gregyoung @github
$ pip install pyotp $ python >>> totp = pyotp.TOTP('<your-secret>') >>> totp.now() 123456 You do need your existing secret though, or a way to register a new one-time password generator with GitHub to get a new secret . No phone required.

2
146
Two-factor authentication adds an extra layer of security to your online accounts. And in this tutorial, @ashutoshkrris shows you how to set it up in your Flask apps. You'll use the PyOTP library and Google Authenticator & learn all about the 2FA flow. freecodecamp.org/news/how-to…
2
16
86
16,457
そしてフロント側にしても, #Python のPyOTPモジュールでワンタイムパスワードを得ることができるということか…(あまり使ってないTikTokアカウントの二段階認証で確認). #プログラミング
フロント側にGoogle Authenticator使ってみたけど,普通に対応できるな…ふむ. #TOTP #Python #programming
2
546