Package piktok
Expand source code
import asyncio
import atexit
from aiohttp import ClientSession, TCPConnector
from .base import INFO, SUCCESS, s_print
from .discover import Discover
from .suggested import Suggested
from .tiktoks import TikToks
from .info import Info
from .verifier import Verifier
class App:
"""
Main application class. Initialize with app = App(proxy) to start.
"""
_session: ClientSession = None
@classmethod
def __create(cls):
if cls._session is None:
cls._session = ClientSession(connector=TCPConnector(verify_ssl=False))
def __init__(self, proxy: str = None):
"""
Args:
proxy (str): proxy server url
"""
App.__create()
s_print("HTTP session connected", INFO)
self._proxy = ""
if proxy:
s_print(f"Connecting to proxy at {proxy}", INFO)
self._proxy = proxy
self.discover = Discover(self._session, self._proxy)
self.suggested = Suggested(self._session, self._proxy)
verifier_object = Verifier(self._session, self._proxy)
self.tiktoks = TikToks(verifier_object)
self.info = Info(verifier_object)
s_print("Started component apps", INFO)
atexit.register(App.cleanup)
s_print("Initialized app!", SUCCESS)
@property
def proxy(self):
return self._proxy
@proxy.setter
def proxy(self, proxy: str):
self._proxy = proxy
@classmethod
def cleanup(cls):
s_print("Closing HTTP session...", INFO)
if cls._session:
asyncio.get_event_loop().create_task(cls._session.close())
s_print("App shut down.", SUCCESS)
Sub-modules
piktok.base
piktok.discover
piktok.info
piktok.stealth
piktok.suggested
piktok.tiktoks
piktok.verifier
Classes
class App (proxy: str = None)
-
Main application class. Initialize with app = App(proxy) to start.
Args
proxy
:str
- proxy server url
Expand source code
class App: """ Main application class. Initialize with app = App(proxy) to start. """ _session: ClientSession = None @classmethod def __create(cls): if cls._session is None: cls._session = ClientSession(connector=TCPConnector(verify_ssl=False)) def __init__(self, proxy: str = None): """ Args: proxy (str): proxy server url """ App.__create() s_print("HTTP session connected", INFO) self._proxy = "" if proxy: s_print(f"Connecting to proxy at {proxy}", INFO) self._proxy = proxy self.discover = Discover(self._session, self._proxy) self.suggested = Suggested(self._session, self._proxy) verifier_object = Verifier(self._session, self._proxy) self.tiktoks = TikToks(verifier_object) self.info = Info(verifier_object) s_print("Started component apps", INFO) atexit.register(App.cleanup) s_print("Initialized app!", SUCCESS) @property def proxy(self): return self._proxy @proxy.setter def proxy(self, proxy: str): self._proxy = proxy @classmethod def cleanup(cls): s_print("Closing HTTP session...", INFO) if cls._session: asyncio.get_event_loop().create_task(cls._session.close()) s_print("App shut down.", SUCCESS)
Static methods
def cleanup()
-
Expand source code
@classmethod def cleanup(cls): s_print("Closing HTTP session...", INFO) if cls._session: asyncio.get_event_loop().create_task(cls._session.close()) s_print("App shut down.", SUCCESS)
Instance variables
var proxy
-
Expand source code
@property def proxy(self): return self._proxy