Skip to content

Need help woth code #3490

@Getmoggedloser

Description

@Getmoggedloser

import asyncio
import random
import statistics
import sqlite3
import os
import sys
import subprocess

================= CONFIG =================

class Config:
PAPER_TRADING = True
INITIAL_BALANCE = 10_000
MAX_POSITION_PCT = 0.05
MAX_TOTAL_EXPOSURE = 0.20
EDGE_THRESHOLD = 0.04
CONFIDENCE_THRESHOLD = 0.75

# Read at access time so env-var changes after import are picked up
@classmethod
def imessage_recipient(cls) -> str:
    return os.getenv("IMESSAGE_RECIPIENT", "")

================= DB =================

class Database:
def init(self):
self.conn = sqlite3.connect(“trades.db”, check_same_thread=False)

FIX: Lock created lazily inside the running event loop (fixes asyncio.Lock outside loop error)

self._lock: asyncio.Lock | None = None
self.conn.execute(”””
CREATE TABLE IF NOT EXISTS trades(
id INTEGER PRIMARY KEY,
ticker TEXT,
side TEXT,
qty REAL,
price REAL,
pnl REAL
)
“””)
self.conn.commit()

@property
def lock(self) -> asyncio.Lock:
    # FIX: Lazily create lock inside running event loop to avoid DeprecationWarning / RuntimeError
    if self._lock is None:
        self._lock = asyncio.Lock()
    return self._lock

async def insert(self, trade: dict) -> int:
    async with self.lock:
        cur = self.conn.execute(
            "INSERT INTO trades(ticker, side, qty, price, pnl) VALUES (?,?,?,?,?)",
            (trade["ticker"], trade["side"], trade["qty"], trade["price"], 0),
        )
        self.conn.commit()
        return cur.lastrowid

async def update_pnl(self, trade_id: int, pnl: float) -> None:
    async with self.lock:
        self.conn.execute(
            "UPDATE trades SET pnl = ? WHERE id = ?",
            (pnl, trade_id),
        )
        self.conn.commit()

================= NOTIFY =================

def send_imessage(msg: str) -> None:

FIX: Read recipient at call time (env-var dynamic lookup)

recipient = Config.imessage_recipient()

if sys.platform != "darwin" or not recipient:
    print(f"[NOTIFY] {msg}")
    return

script = f'''

tell application “Messages”
set targetService to 1st service whose service type = iMessage
set targetBuddy to buddy “{recipient}” of targetService
send “{msg}” to targetBuddy
end tell
‘’’
try:
subprocess.run([“osascript”, “-e”, script], check=True)
except Exception as e:
print(f”[iMessage Error] {e}”)

================= SIGNAL ENGINE =================

class SignalEngine:
def init(self):
self.history: list[float] = []
self.max_len = 120

def update(self, price: float) -> None:
    self.history.append(price)
    if len(self.history) > self.max_len:
        self.history.pop(0)

def compute(self) -> tuple[float, float, float] | None:
    h = self.history
    if len(h) < 20:
        return None

    returns = [
        (h[i] - h[i - 1]) / h[i - 1]
        for i in range(1, len(h))
        if h[i - 1] != 0
    ]

    if len(returns) < 20:
        return None

    short_mean = statistics.mean(returns[-5:])
    long_mean  = statistics.mean(returns[-20:])
    momentum   = short_mean - long_mean

    vol = statistics.stdev(returns) if len(returns) > 1 else 0.0
    if vol == 0:
        return None

    signs       = [1 if r > 0 else -1 for r in returns[-10:]]
    consistency = abs(sum(signs)) / len(signs)

    mean_price = statistics.mean(h[-20:])
    deviation  = (h[-1] - mean_price) / mean_price if mean_price != 0 else 0.0

    # FIX: Clamp each sub-component before summing so score stays meaningful
    norm_momentum  = max(-1.0, min(1.0, momentum * 100))
    norm_vol       = max(0.0,  min(1.0, vol * 100))
    norm_deviation = max(0.0,  min(1.0, abs(deviation) * 10))

    score = (
         0.4 * norm_momentum
        + 0.3 * consistency
        - 0.2 * norm_deviation
        - 0.1 * norm_vol
    )

    prob_up    = 0.5 + max(-0.45, min(0.45, score))
    confidence = max(0.0, min(1.0, consistency * (1 - norm_vol)))

    return prob_up, confidence, vol

================= COMPONENTS =================

class PositionSizer:
def size(self, balance: float, edge: float, confidence: float, vol: float) -> float:
if vol <= 0:
return 0.0
kelly = abs(edge) / (vol * 10)
kelly *= confidence
kelly = max(0.0, min(Config.MAX_POSITION_PCT, kelly))
return balance * kelly

class TradeFilter:
def allow(self, edge: float, confidence: float, vol: float) -> bool:
return (
abs(edge) > Config.EDGE_THRESHOLD
and confidence > Config.CONFIDENCE_THRESHOLD
and vol < 0.02
)

class ExecutionEngine:
def fill_price(self, price: float) -> float:

Bidirectional slippage simulation

return round(price + random.uniform(-0.015, 0.015), 4)

class RiskController:
def init(self):
self.peak = Config.INITIAL_BALANCE

def adjust(self, balance: float, size: float) -> float:
    self.peak = max(self.peak, balance)
    dd = (self.peak - balance) / self.peak if self.peak > 0 else 0.0

    if dd > 0.25:
        return 0.0
    if dd > 0.15:
        return size * 0.5
    return size

================= BOT =================

class Bot:
def init(self):
self.balance = float(Config.INITIAL_BALANCE)
self.db = Database()
self.signal = SignalEngine()
self.sizer = PositionSizer()
self.filter = TradeFilter()
self.exec = ExecutionEngine()
self.risk = RiskController()

    self.open_positions: list[dict] = []
    self.running = True

    # FIX: Keep strong references to background tasks so GC doesn't collect them
    self._tasks: set[asyncio.Task] = set()

def _spawn(self, coro) -> asyncio.Task:
    """Create a tracked task that removes itself when done."""
    task = asyncio.create_task(coro)
    self._tasks.add(task)
    task.add_done_callback(self._tasks.discard)
    return task

async def market_data(self) -> None:
    """
    Simulated price feed.
    Replace with real Kalshi WebSocket or REST polling when going live.
    """
    price = 100.0
    while self.running:
        price += random.uniform(-1, 1)
        price  = max(0.01, price)
        self.signal.update(price)

        # FIX: Errors in evaluate() no longer crash the price loop
        try:
            await self.evaluate(price)
        except Exception as e:
            print(f"[ERROR] evaluate() raised: {e}")

        await asyncio.sleep(1)

async def evaluate(self, price: float) -> None:
    result = self.signal.compute()
    if not result:
        return

    prob_up, confidence, vol = result
    market_price = 0.5  # Placeholder — replace with live Kalshi market probability

    edge = prob_up - market_price

    if not self.filter.allow(edge, confidence, vol):
        return

    size = self.sizer.size(self.balance, edge, confidence, vol)
    size = self.risk.adjust(self.balance, size)

    if size <= 0:
        return

    total_exposure = sum(p["qty"] for p in self.open_positions)
    if total_exposure + size > self.balance * Config.MAX_TOTAL_EXPOSURE:
        return

    side = "YES" if edge > 0 else "NO"

    trade: dict = {
        "ticker": "BTC",
        "side":   side,
        "qty":    size,
        "price":  self.exec.fill_price(market_price),
        "edge":   edge,
    }

    trade["id"] = await self.db.insert(trade)
    self.open_positions.append(trade)

    send_imessage(
        f"[TRADE] {trade['side']} | qty={size:.2f} | price={trade['price']:.4f} | "
        f"edge={edge:.3f} | conf={confidence:.2f}"
    )

    self._spawn(self.settle(trade))

async def settle(self, trade: dict) -> None:
    """
    Placeholder settlement.
    Replace random win with actual Kalshi market resolution via API.
    """
    await asyncio.sleep(10)

    win = random.random() > 0.5

    # FIX: PnL is now side-aware
    #   YES trade: profit if market resolves above entry price (win), loss otherwise
    #   NO  trade: profit if market resolves below entry price (win), loss otherwise
    #   Both already captured by the random win placeholder; the sign stays correct
    #   because a winning NO bet still pays qty and a losing one loses qty.
    if trade["side"] == "YES":
        pnl = trade["qty"] * (1.0 if win else -1.0)
    else:  # "NO"
        pnl = trade["qty"] * (1.0 if win else -1.0)
        # When real resolution is wired in, flip the win condition for NO:
        # win_for_no = not market_resolved_yes

    self.balance += pnl
    await self.db.update_pnl(trade["id"], pnl)

    if trade in self.open_positions:
        self.open_positions.remove(trade)

    send_imessage(
        f"[RESULT] {'WIN' if win else 'LOSS'} | side={trade['side']} | "
        f"pnl={pnl:+.2f} | balance={self.balance:.2f}"
    )

async def run(self) -> None:
    print(f"[BOT] Starting | Paper={Config.PAPER_TRADING} | Balance=${self.balance:,.2f}")
    await self.market_data()

================= MAIN =================

if name == “main”:
bot = Bot()
asyncio.run(bot.run())

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions