Skip to content
374 changes: 374 additions & 0 deletions routes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
"""Flask blueprint port of the original Express ``routes/index.js``.

This is part of the deliberately vulnerable Snyk "goof" demo app. The
intentional vulnerabilities (NoSQL injection, open redirect, command
injection, zip slip, SSTI, prototype-pollution analogue) are PRESERVED on
purpose for security education. Do NOT add sanitization/validation/escaping.
"""
import datetime
import functools
import io
import os
import re
import threading
import zipfile

from bson import ObjectId
from flask import (
Blueprint,
jsonify,
make_response,
redirect,
render_template,
render_template_string,
request,
session,
)

from models.mongo import todos, users

main_bp = Blueprint('main', __name__)


def login_required(view):
"""Mirror of the original ``isLoggedIn`` middleware."""

@functools.wraps(view)
def wrapped(*args, **kwargs):
if session.get('loggedIn') == 1:
return view(*args, **kwargs)
return redirect('/')

return wrapped


def parse(todo):
"""Loose port of the original reminder ``parse`` helper."""
t = str(todo)
remind_token = ' in '
reminder = t.find(remind_token)
if reminder > 0:
time = t[reminder + len(remind_token):]
time = time.rstrip('\n')
t = t[:reminder]
if time:
t += ' [' + time + ']'
return t


def is_blank(value):
return not value or re.match(r'^\s*$', value) is not None


def _json_body():
"""Return the parsed JSON body if it is an object, else an empty dict.

Mirrors the original JS, where property access on a non-object body (number,
string, array, ...) yields ``undefined`` instead of raising, so a malformed
body never produces a 500.
"""
body = request.get_json(force=True, silent=True)
return body if isinstance(body, dict) else {}


@main_bp.route('/', methods=['GET'])
def index():
todos_list = list(todos.find({}).sort('updated_at', -1))
return render_template(
'index.html',
title='Patch TODO List',
subhead='Vulnerabilities at their best',
todos=todos_list,
)


@main_bp.route('/login', methods=['GET'])
def login():
return render_template(
'admin.html',
title='Admin Access',
granted=False,
redirectPage=request.args.get('redirectPage'),
)


@main_bp.route('/login', methods=['POST'])
def login_handler():
# Read JSON first so the NoSQL-injection path is preserved (the body can hold
# Mongo operators like {"$gt": ""}); fall back to the urlencoded form so the
# HTML login form works too. app.js registers both bodyParser.json() and
# urlencoded(), so the original req.body accepted either content type.
body = _json_body() or request.form.to_dict()

# NoSQL INJECTION (intentional): the raw, unsanitized request body is
# passed straight into the Mongo query, allowing operator injection such
# as {"username": {"$gt": ""}, "password": {"$gt": ""}}.
user = users.find_one({
'username': body.get('username'),
'password': body.get('password'),
})
Comment on lines +106 to +109

@devin-ai-integration devin-ai-integration Bot Jun 9, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Login handler drops the email validation present in the original JS

The original loginHandler in routes/index.js:38 calls validator.isEmail(req.body.username) before querying MongoDB, returning 401 immediately if the username isn't a valid email. The Python port at routes/__init__.py:91-94 omits this check entirely and passes body.get('username') directly to users.find_one(). While the NoSQL injection is marked as intentional, the removal of the email validation widens the attack surface beyond the original — the JS version at least required a valid email string before querying, which blocked some injection payloads (e.g., {"$gt": ""} is not a valid email). This may be intentional for the demo, but it's a behavioral divergence worth noting.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This divergence is intentional and required by the migration spec for this PR. The task explicitly calls for POST /login to read the JSON body and pass username/password directly into users.find_one({...}) without sanitizing, so the NoSQL injection works. As you noted, the original validator.isEmail() check would block payloads like {"$gt": ""} and effectively neuter the intended vulnerability. This is a deliberately-vulnerable Snyk "goof" demo, so I'm intentionally not re-adding the email validation. Leaving the code as-is.


if user:
session['loggedIn'] = 1
username = body.get('username')
print('User logged in: ' + str(username))

# OPEN REDIRECT (intentional): redirect target taken from user input
# with no validation.
redirect_page = body.get('redirectPage')
if redirect_page:
return redirect(redirect_page)
return redirect('/admin')

return ('', 401)


@main_bp.route('/admin', methods=['GET'])
@login_required
def admin():
return render_template('admin.html', title='Admin Access Granted', granted=True)


@main_bp.route('/account_details', methods=['GET'])
@login_required
def get_account_details():
return render_template('account.html')


@main_bp.route('/account_details', methods=['POST'])
@login_required
def save_account_details():
# Loose handling that mirrors the original save_account_details: the
# submitted profile is rendered back as-is without strict validation.
profile = _json_body() or request.form.to_dict()
return render_template('account.html', **profile)
Comment on lines +140 to +144

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 save_account_details removes all input validation present in the original

The original save_account_details in routes/index.js:89-113 validates the submitted profile fields (email format, phone format, ASCII checks on name/country) before rendering. The Python port at line 139 simply does profile = _json_body() or request.form.to_dict() and renders with no validation. The file's header comment (line 6) says intentional vulnerabilities are preserved, but this goes further — it removes validation that the original did have, creating a new attack surface (arbitrary template variables via **profile at line 140 could potentially conflict with Jinja2 internals). This may be intentional for the demo, but it's worth noting as a deliberate weakening beyond the original.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is intentional per the migration spec, which calls for "basic (loose) handling" of POST /account_details with no validation — consistent with this being a deliberately-vulnerable demo app where we don't add sanitization/validation. The original's validator.isEmail/isMobilePhone/isAscii gate is dropped on purpose (same rationale as the dropped isEmail gate on login). Note the keys come from JSON object keys or request.form keys, which are always strings, so **profile won't raise on non-string keys. I'm leaving the loose behavior as-is to match the spec; flagging here for visibility rather than re-adding validation.



@main_bp.route('/logout', methods=['GET'])
def logout():
session['loggedIn'] = 0
session.clear()
return redirect('/')


@main_bp.route('/create', methods=['POST'])
def create():
item = request.form.get('content')
if item is None:
item = _json_body().get('content')

if item is None:
# Missing content: mirror the original, which errored out instead of
# silently persisting a garbage todo (str(None) -> 'None').
return ('Missing content', 400)

img_regex = r'\!\[alt text\]\((http.*)\s\".*'
# re.search (not re.match) to mirror JS String.prototype.match, which
# matches anywhere in the string rather than only at the start.
if isinstance(item, str) and re.search(img_regex, item):
url = re.search(img_regex, item).group(1)
print('found img: ' + url)

# COMMAND INJECTION (intentional): the extracted URL is concatenated
# into a shell command with no escaping.
os.system('identify ' + url)
else:
item = parse(item)
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

todos.insert_one({
'content': item,
'updated_at': datetime.datetime.utcnow(),
})

# Mirror the original JS: ``todo.content.toString('base64')`` calls
# String.prototype.toString, which ignores the 'base64' argument and returns
# the plain string unchanged (only Buffer.prototype.toString encodes).
resp = make_response(str(item), 302)
resp.headers['Location'] = '/'
return resp


@main_bp.route('/destroy/<id>', methods=['GET'])
def destroy(id):
try:
todos.delete_one({'_id': ObjectId(id)})
except Exception:
pass
return redirect('/')


@main_bp.route('/edit/<id>', methods=['GET'])
def edit(id):
todos_list = list(todos.find({}).sort('updated_at', -1))
return render_template(
'edit.html',
title='TODO',
todos=todos_list,
current=id,
)


@main_bp.route('/update/<id>', methods=['POST'])
def update(id):
content = request.form.get('content')
if content is None:
content = _json_body().get('content')

todos.update_one(
{'_id': ObjectId(id)},
{'$set': {'content': content, 'updated_at': datetime.datetime.utcnow()}},
)
return redirect('/')


@main_bp.route('/import', methods=['POST'])
def import_todos():
if not request.files or 'importFile' not in request.files:
return 'No files were uploaded.'

import_file = request.files['importFile']
file_bytes = import_file.read()

data = ''
if zipfile.is_zipfile(io.BytesIO(file_bytes)):
# ZIP SLIP (intentional): archive entries are extracted with no path
# validation, allowing writes outside the target directory.
extracted_path = '/tmp/extracted_files'
zipfile.ZipFile(io.BytesIO(file_bytes)).extractall(extracted_path)
# Mirror the original JS exactly: it sets this fallback string and then the
# async fs.readFile callback does `data = data` (a no-op, because the
# callback parameter shadows the outer variable), so backup.txt's contents
# are never actually used -- data always stays this literal.
data = 'No backup.txt file found'
else:
data = file_bytes.decode('ascii', errors='replace')

lines = data.split('\n')
for line in lines:
parts = line.split(',')
what = parts[0]
print('importing ' + what)
when = parts[1] if len(parts) > 1 else None
locale = parts[2] if len(parts) > 2 else None
fmt = parts[3] if len(parts) > 3 else None
item = what
if not is_blank(what):
if not is_blank(when) and not is_blank(locale) and not is_blank(fmt):
item += ' [' + str(when) + ']'

todos.insert_one({
'content': item,
'updated_at': datetime.datetime.utcnow(),
})

return redirect('/')


@main_bp.route('/about_new', methods=['GET'])
def about_new():
# SSTI (intentional): the unsanitized ``device`` query parameter is
# concatenated directly into the template source and rendered, allowing
# server-side template injection (e.g. ?device={{7*7}}).
device = request.args.get('device')
template = (
'<h1>Patch TODO List</h1>'
'<h2>Vulnerabilities at their best</h2>'
'<p>Device: ' + str(device) + '</p>'
)
return render_template_string(template)


# ---------------------------------------------------------------------------
# Chat endpoints with a prototype-pollution analogue.
#
# In order of simplicity we are not using any database for chat. Hardcoded
# users below are INTENTIONAL (security demo).
# ---------------------------------------------------------------------------
users_chat = [
# You know the password for the user.
{"name": "user", "password": "pwd"},
# You don't know the password for the admin.
{"name": "admin", "password": os.urandom(16).hex(), "canDelete": True},
]

messages = []
_state = {'last_id': 1}
_chat_lock = threading.Lock()


def find_user(auth):
if not isinstance(auth, dict):
auth = {}
for u in users_chat:
if u.get('name') == auth.get('name') and u.get('password') == auth.get('password'):
return u
return None


def deep_merge(target, source):
"""Recursive merge that does NOT filter dangerous keys.

This is the Python analogue of the original ``lodash.merge`` prototype
pollution: keys such as ``__class__`` / ``__proto__`` / ``__init__`` are
merged through without any filtering (intentional vulnerability).
"""
for key in source:
value = source[key]
if isinstance(value, dict) and isinstance(target.get(key), dict):
deep_merge(target[key], value)
else:
target[key] = value
return target


@main_bp.route('/chat', methods=['GET'])
def chat_get():
with _chat_lock:
snapshot = list(messages)
return jsonify(snapshot)


@main_bp.route('/chat', methods=['PUT'])
def chat_add():
body = _json_body()
user = find_user(body.get('auth'))

if not user:
return jsonify({'ok': False, 'error': 'Access denied'}), 403

message = {
# Default message icon. Can be overwritten by user.
'icon': '\U0001f44b',
}

# Only merge object messages (mirrors lodash.merge, which ignores non-object
# sources). A dict here still flows through unfiltered -> prototype-pollution
# analogue is preserved.
msg_source = body.get('message')
if isinstance(msg_source, dict):
deep_merge(message, msg_source)

with _chat_lock:
deep_merge(message, {
'id': _state['last_id'],
'timestamp': int(datetime.datetime.utcnow().timestamp() * 1000),
'userName': user['name'],
})
_state['last_id'] += 1
messages.append(message)

return jsonify({'ok': True})


@main_bp.route('/chat', methods=['DELETE'])
def chat_delete():
global messages
body = _json_body()
user = find_user(body.get('auth'))

if not user or not user.get('canDelete'):
return jsonify({'ok': False, 'error': 'Access denied'}), 403

with _chat_lock:
messages = [m for m in messages if m.get('id') != body.get('messageId')]
return jsonify({'ok': True})
Loading