From 8b4757ac05abdb7b1b151e185b8d17d1562d6108 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Jun 2026 06:23:03 +0000 Subject: [PATCH 1/6] feature: port routes/index.js to Flask blueprint (routes/__init__.py) Implement main_bp Blueprint with all handlers from the original Express routes/index.js, preserving intentional vulnerabilities (NoSQL injection, open redirect, command injection, zip slip, SSTI, prototype-pollution analogue) for the Snyk goof security-education app. Co-Authored-By: Eashan Sinha --- routes/__init__.py | 341 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 341 insertions(+) diff --git a/routes/__init__.py b/routes/__init__.py index e69de29bb2d..252218d4670 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -0,0 +1,341 @@ +"""Flask blueprint port of the original Express ``routes/index.js``. + +This is part of the deliberately vulnerable Snyk "goof" demo app. The +intentional vulnerabilities (NoSQL injection, open redirect, command +injection, zip slip, SSTI, prototype-pollution analogue) are PRESERVED on +purpose for security education. Do NOT add sanitization/validation/escaping. +""" +import base64 +import datetime +import functools +import io +import os +import re +import zipfile + +from bson import ObjectId +from flask import ( + Blueprint, + jsonify, + make_response, + redirect, + render_template, + render_template_string, + request, + session, +) + +from models.mongo import todos, users + +main_bp = Blueprint('main', __name__) + + +def login_required(view): + """Mirror of the original ``isLoggedIn`` middleware.""" + + @functools.wraps(view) + def wrapped(*args, **kwargs): + if session.get('loggedIn') == 1: + return view(*args, **kwargs) + return redirect('/') + + return wrapped + + +def parse(todo): + """Loose port of the original reminder ``parse`` helper.""" + t = str(todo) + remind_token = ' in ' + reminder = t.find(remind_token) + if reminder > 0: + time = t[reminder + len(remind_token):] + time = time.rstrip('\n') + t = t[:reminder] + if time: + t += ' [' + time + ']' + return t + + +def is_blank(value): + return not value or re.match(r'^\s*$', value) is not None + + +@main_bp.route('/', methods=['GET']) +def index(): + todos_list = list(todos.find({}).sort('updated_at', -1)) + return render_template( + 'index.html', + title='Patch TODO List', + subhead='Vulnerabilities at their best', + todos=todos_list, + ) + + +@main_bp.route('/login', methods=['GET']) +def login(): + return render_template( + 'admin.html', + title='Admin Access', + granted=False, + redirectPage=request.args.get('redirectPage'), + ) + + +@main_bp.route('/login', methods=['POST']) +def login_handler(): + body = request.get_json(force=True, silent=True) or {} + + # NoSQL INJECTION (intentional): the raw, unsanitized request body is + # passed straight into the Mongo query, allowing operator injection such + # as {"username": {"$gt": ""}, "password": {"$gt": ""}}. + user = users.find_one({ + 'username': body.get('username'), + 'password': body.get('password'), + }) + + if user: + session['loggedIn'] = 1 + username = body.get('username') + print('User logged in: ' + str(username)) + + # OPEN REDIRECT (intentional): redirect target taken from user input + # with no validation. + redirect_page = body.get('redirectPage') + if redirect_page: + return redirect(redirect_page) + return redirect('/admin') + + return ('', 401) + + +@main_bp.route('/admin', methods=['GET']) +@login_required +def admin(): + return render_template('admin.html', title='Admin Access Granted', granted=True) + + +@main_bp.route('/account_details', methods=['GET']) +@login_required +def get_account_details(): + return render_template('account.html') + + +@main_bp.route('/account_details', methods=['POST']) +@login_required +def save_account_details(): + # Loose handling that mirrors the original save_account_details: the + # submitted profile is rendered back as-is without strict validation. + profile = request.get_json(force=True, silent=True) or request.form.to_dict() + return render_template('account.html', **profile) + + +@main_bp.route('/logout', methods=['GET']) +def logout(): + session['loggedIn'] = 0 + session.clear() + return redirect('/') + + +@main_bp.route('/create', methods=['POST']) +def create(): + item = request.form.get('content') + if item is None: + body = request.get_json(force=True, silent=True) or {} + item = body.get('content') + + img_regex = r'\!\[alt text\]\((http.*)\s\".*' + if isinstance(item, str) and re.match(img_regex, item): + url = re.match(img_regex, item).group(1) + print('found img: ' + url) + + # COMMAND INJECTION (intentional): the extracted URL is concatenated + # into a shell command with no escaping. + os.system('identify ' + url) + else: + item = parse(item) + + todos.insert_one({ + 'content': item, + 'updated_at': datetime.datetime.utcnow(), + }) + + content_b64 = base64.b64encode(str(item).encode()).decode() + resp = make_response(content_b64, 302) + resp.headers['Location'] = '/' + return resp + + +@main_bp.route('/destroy/', methods=['GET']) +def destroy(id): + try: + todos.delete_one({'_id': ObjectId(id)}) + except Exception: + pass + return redirect('/') + + +@main_bp.route('/edit/', methods=['GET']) +def edit(id): + todos_list = list(todos.find({}).sort('updated_at', -1)) + return render_template( + 'edit.html', + title='TODO', + todos=todos_list, + current=id, + ) + + +@main_bp.route('/update/', methods=['POST']) +def update(id): + content = request.form.get('content') + if content is None: + body = request.get_json(force=True, silent=True) or {} + content = body.get('content') + + todos.update_one( + {'_id': ObjectId(id)}, + {'$set': {'content': content, 'updated_at': datetime.datetime.utcnow()}}, + ) + return redirect('/') + + +@main_bp.route('/import', methods=['POST']) +def import_todos(): + if not request.files or 'importFile' not in request.files: + return 'No files were uploaded.' + + import_file = request.files['importFile'] + file_bytes = import_file.read() + + data = '' + if zipfile.is_zipfile(io.BytesIO(file_bytes)): + # ZIP SLIP (intentional): archive entries are extracted with no path + # validation, allowing writes outside the target directory. + extracted_path = '/tmp/extracted_files' + zipfile.ZipFile(io.BytesIO(file_bytes)).extractall(extracted_path) + data = 'No backup.txt file found' + try: + with open('backup.txt', 'r') as f: + data = f.read() + except OSError: + pass + else: + data = file_bytes.decode('ascii', errors='replace') + + lines = data.split('\n') + for line in lines: + parts = line.split(',') + what = parts[0] + print('importing ' + what) + when = parts[1] if len(parts) > 1 else None + locale = parts[2] if len(parts) > 2 else None + fmt = parts[3] if len(parts) > 3 else None + item = what + if not is_blank(what): + if not is_blank(when) and not is_blank(locale) and not is_blank(fmt): + item += ' [' + str(when) + ']' + + todos.insert_one({ + 'content': item, + 'updated_at': datetime.datetime.utcnow(), + }) + + return redirect('/') + + +@main_bp.route('/about_new', methods=['GET']) +def about_new(): + # SSTI (intentional): the unsanitized ``device`` query parameter is + # concatenated directly into the template source and rendered, allowing + # server-side template injection (e.g. ?device={{7*7}}). + device = request.args.get('device') + template = ( + '

Patch TODO List

' + '

Vulnerabilities at their best

' + '

Device: ' + str(device) + '

' + ) + return render_template_string(template) + + +# --------------------------------------------------------------------------- +# Chat endpoints with a prototype-pollution analogue. +# +# In order of simplicity we are not using any database for chat. Hardcoded +# users below are INTENTIONAL (security demo). +# --------------------------------------------------------------------------- +users_chat = [ + # You know the password for the user. + {"name": "user", "password": "pwd"}, + # You don't know the password for the admin. + {"name": "admin", "password": os.urandom(16).hex(), "canDelete": True}, +] + +messages = [] +_state = {'last_id': 1} + + +def find_user(auth): + auth = auth or {} + for u in users_chat: + if u.get('name') == auth.get('name') and u.get('password') == auth.get('password'): + return u + return None + + +def deep_merge(target, source): + """Recursive merge that does NOT filter dangerous keys. + + This is the Python analogue of the original ``lodash.merge`` prototype + pollution: keys such as ``__class__`` / ``__proto__`` / ``__init__`` are + merged through without any filtering (intentional vulnerability). + """ + for key in source: + value = source[key] + if isinstance(value, dict) and isinstance(target.get(key), dict): + deep_merge(target[key], value) + else: + target[key] = value + return target + + +@main_bp.route('/chat', methods=['GET']) +def chat_get(): + return jsonify(messages) + + +@main_bp.route('/chat', methods=['PUT']) +def chat_add(): + body = request.get_json(force=True, silent=True) or {} + user = find_user(body.get('auth') or {}) + + if not user: + return jsonify({'ok': False, 'error': 'Access denied'}), 403 + + message = { + # Default message icon. Can be overwritten by user. + 'icon': '\U0001f44b', + } + + deep_merge(message, body.get('message') or {}) + deep_merge(message, { + 'id': _state['last_id'], + 'timestamp': int(datetime.datetime.utcnow().timestamp() * 1000), + 'userName': user['name'], + }) + _state['last_id'] += 1 + + messages.append(message) + return jsonify({'ok': True}) + + +@main_bp.route('/chat', methods=['DELETE']) +def chat_delete(): + global messages + body = request.get_json(force=True, silent=True) or {} + user = find_user(body.get('auth') or {}) + + if not user or not user.get('canDelete'): + return jsonify({'ok': False, 'error': 'Access denied'}), 403 + + messages = [m for m in messages if m.get('id') != body.get('messageId')] + return jsonify({'ok': True}) From a46cfead388dfc57ef6aebcb2f20a4ec44d33e6d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Jun 2026 06:28:34 +0000 Subject: [PATCH 2/6] bug: fix missing-content insert and chat last_id race in routes blueprint - create(): return 400 for missing content instead of silently inserting a 'None' todo (str(None)). - chat: guard _state['last_id'] read/increment, message append, and message-list rebuild with a threading.Lock to avoid duplicate ids under threaded Flask. Co-Authored-By: Eashan Sinha --- routes/__init__.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/routes/__init__.py b/routes/__init__.py index 252218d4670..9ccf5861b0d 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -11,6 +11,7 @@ import io import os import re +import threading import zipfile from bson import ObjectId @@ -143,6 +144,11 @@ def create(): body = request.get_json(force=True, silent=True) or {} item = body.get('content') + if item is None: + # Missing content: mirror the original, which errored out instead of + # silently persisting a garbage todo (str(None) -> 'None'). + return ('Missing content', 400) + img_regex = r'\!\[alt text\]\((http.*)\s\".*' if isinstance(item, str) and re.match(img_regex, item): url = re.match(img_regex, item).group(1) @@ -272,6 +278,7 @@ def about_new(): messages = [] _state = {'last_id': 1} +_chat_lock = threading.Lock() def find_user(auth): @@ -317,14 +324,16 @@ def chat_add(): } deep_merge(message, body.get('message') or {}) - deep_merge(message, { - 'id': _state['last_id'], - 'timestamp': int(datetime.datetime.utcnow().timestamp() * 1000), - 'userName': user['name'], - }) - _state['last_id'] += 1 - messages.append(message) + with _chat_lock: + deep_merge(message, { + 'id': _state['last_id'], + 'timestamp': int(datetime.datetime.utcnow().timestamp() * 1000), + 'userName': user['name'], + }) + _state['last_id'] += 1 + messages.append(message) + return jsonify({'ok': True}) @@ -337,5 +346,6 @@ def chat_delete(): if not user or not user.get('canDelete'): return jsonify({'ok': False, 'error': 'Access denied'}), 403 - messages = [m for m in messages if m.get('id') != body.get('messageId')] + with _chat_lock: + messages = [m for m in messages if m.get('id') != body.get('messageId')] return jsonify({'ok': True}) From b57be1d577e98d474880e5600c4e0f82e09c8164 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Jun 2026 06:34:21 +0000 Subject: [PATCH 3/6] bug: use re.search in create() to match JS str.match anywhere-in-string Python re.match only anchors at the start; the original item.match(imgRegex) searches anywhere. Using re.search restores detection of image markdown (and the intentional command injection) when content has leading text. Co-Authored-By: Eashan Sinha --- routes/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/routes/__init__.py b/routes/__init__.py index 9ccf5861b0d..31f5c06e0ba 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -150,8 +150,10 @@ def create(): return ('Missing content', 400) img_regex = r'\!\[alt text\]\((http.*)\s\".*' - if isinstance(item, str) and re.match(img_regex, item): - url = re.match(img_regex, item).group(1) + # re.search (not re.match) to mirror JS String.prototype.match, which + # matches anywhere in the string rather than only at the start. + if isinstance(item, str) and re.search(img_regex, item): + url = re.search(img_regex, item).group(1) print('found img: ' + url) # COMMAND INJECTION (intentional): the extracted URL is concatenated From 5314712a6d0be9c7743d3e79ade8b63df29dccff Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Jun 2026 06:46:57 +0000 Subject: [PATCH 4/6] bug: handle non-dict JSON bodies gracefully like the original JS - add _json_body() helper returning {} for non-object bodies and route all endpoints through it (login/account/create/update/chat), so a malformed body yields a normal response instead of a 500 AttributeError. - guard find_user against non-dict auth. - only deep_merge dict chat messages (mirrors lodash.merge ignoring non-object sources); dict messages still flow through unfiltered, preserving the prototype-pollution analogue. Co-Authored-By: Eashan Sinha --- routes/__init__.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/routes/__init__.py b/routes/__init__.py index 31f5c06e0ba..430fc89b64a 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -61,6 +61,17 @@ def is_blank(value): return not value or re.match(r'^\s*$', value) is not None +def _json_body(): + """Return the parsed JSON body if it is an object, else an empty dict. + + Mirrors the original JS, where property access on a non-object body (number, + string, array, ...) yields ``undefined`` instead of raising, so a malformed + body never produces a 500. + """ + body = request.get_json(force=True, silent=True) + return body if isinstance(body, dict) else {} + + @main_bp.route('/', methods=['GET']) def index(): todos_list = list(todos.find({}).sort('updated_at', -1)) @@ -84,7 +95,7 @@ def login(): @main_bp.route('/login', methods=['POST']) def login_handler(): - body = request.get_json(force=True, silent=True) or {} + body = _json_body() # NoSQL INJECTION (intentional): the raw, unsanitized request body is # passed straight into the Mongo query, allowing operator injection such @@ -126,7 +137,7 @@ def get_account_details(): def save_account_details(): # Loose handling that mirrors the original save_account_details: the # submitted profile is rendered back as-is without strict validation. - profile = request.get_json(force=True, silent=True) or request.form.to_dict() + profile = _json_body() or request.form.to_dict() return render_template('account.html', **profile) @@ -141,8 +152,7 @@ def logout(): def create(): item = request.form.get('content') if item is None: - body = request.get_json(force=True, silent=True) or {} - item = body.get('content') + item = _json_body().get('content') if item is None: # Missing content: mirror the original, which errored out instead of @@ -197,8 +207,7 @@ def edit(id): def update(id): content = request.form.get('content') if content is None: - body = request.get_json(force=True, silent=True) or {} - content = body.get('content') + content = _json_body().get('content') todos.update_one( {'_id': ObjectId(id)}, @@ -284,7 +293,8 @@ def about_new(): def find_user(auth): - auth = auth or {} + if not isinstance(auth, dict): + auth = {} for u in users_chat: if u.get('name') == auth.get('name') and u.get('password') == auth.get('password'): return u @@ -314,8 +324,8 @@ def chat_get(): @main_bp.route('/chat', methods=['PUT']) def chat_add(): - body = request.get_json(force=True, silent=True) or {} - user = find_user(body.get('auth') or {}) + body = _json_body() + user = find_user(body.get('auth')) if not user: return jsonify({'ok': False, 'error': 'Access denied'}), 403 @@ -325,7 +335,12 @@ def chat_add(): 'icon': '\U0001f44b', } - deep_merge(message, body.get('message') or {}) + # Only merge object messages (mirrors lodash.merge, which ignores non-object + # sources). A dict here still flows through unfiltered -> prototype-pollution + # analogue is preserved. + msg_source = body.get('message') + if isinstance(msg_source, dict): + deep_merge(message, msg_source) with _chat_lock: deep_merge(message, { @@ -342,8 +357,8 @@ def chat_add(): @main_bp.route('/chat', methods=['DELETE']) def chat_delete(): global messages - body = request.get_json(force=True, silent=True) or {} - user = find_user(body.get('auth') or {}) + body = _json_body() + user = find_user(body.get('auth')) if not user or not user.get('canDelete'): return jsonify({'ok': False, 'error': 'Access denied'}), 403 From 749ac4ad40b94b9ab8bfce239ddf8e0fb5c968c1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Jun 2026 06:55:17 +0000 Subject: [PATCH 5/6] bug: match JS create() response body and lock chat_get snapshot - create() now sends the plain todo content as the 302 body instead of base64-encoding it. The original JS uses todo.content.toString('base64'), which is String.prototype.toString (ignores the 'base64' arg) and returns the string unchanged; only Buffer.prototype.toString encodes. Dropped the now-unused base64 import. - chat_get snapshots messages under _chat_lock before serializing, matching the locking used by chat_add/chat_delete and avoiding a data race. Co-Authored-By: Eashan Sinha --- routes/__init__.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/routes/__init__.py b/routes/__init__.py index 430fc89b64a..a944e632eb2 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -5,7 +5,6 @@ injection, zip slip, SSTI, prototype-pollution analogue) are PRESERVED on purpose for security education. Do NOT add sanitization/validation/escaping. """ -import base64 import datetime import functools import io @@ -177,8 +176,10 @@ def create(): 'updated_at': datetime.datetime.utcnow(), }) - content_b64 = base64.b64encode(str(item).encode()).decode() - resp = make_response(content_b64, 302) + # Mirror the original JS: ``todo.content.toString('base64')`` calls + # String.prototype.toString, which ignores the 'base64' argument and returns + # the plain string unchanged (only Buffer.prototype.toString encodes). + resp = make_response(str(item), 302) resp.headers['Location'] = '/' return resp @@ -319,7 +320,9 @@ def deep_merge(target, source): @main_bp.route('/chat', methods=['GET']) def chat_get(): - return jsonify(messages) + with _chat_lock: + snapshot = list(messages) + return jsonify(snapshot) @main_bp.route('/chat', methods=['PUT']) From 837c96c07b8e80b8f647bcdedb0eb104a2aad977 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Jun 2026 07:03:59 +0000 Subject: [PATCH 6/6] bug: support form-encoded login and match JS zip-import backup.txt no-op - login_handler now reads JSON first then falls back to request.form, so the HTML login form (application/x-www-form-urlencoded) works again. app.js registers both bodyParser.json() and urlencoded(), so the original req.body accepted either; the JSON-first order preserves the NoSQL-injection path. - import: drop the CWD backup.txt read. The original JS callback does 'data = data' (a no-op via param shadowing) and fs.readFile is async, so data always stays 'No backup.txt file found'. Reading CWD backup.txt diverged. Co-Authored-By: Eashan Sinha --- routes/__init__.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/routes/__init__.py b/routes/__init__.py index a944e632eb2..6a099fbd9ec 100644 --- a/routes/__init__.py +++ b/routes/__init__.py @@ -94,7 +94,11 @@ def login(): @main_bp.route('/login', methods=['POST']) def login_handler(): - body = _json_body() + # Read JSON first so the NoSQL-injection path is preserved (the body can hold + # Mongo operators like {"$gt": ""}); fall back to the urlencoded form so the + # HTML login form works too. app.js registers both bodyParser.json() and + # urlencoded(), so the original req.body accepted either content type. + body = _json_body() or request.form.to_dict() # NoSQL INJECTION (intentional): the raw, unsanitized request body is # passed straight into the Mongo query, allowing operator injection such @@ -231,12 +235,11 @@ def import_todos(): # validation, allowing writes outside the target directory. extracted_path = '/tmp/extracted_files' zipfile.ZipFile(io.BytesIO(file_bytes)).extractall(extracted_path) + # Mirror the original JS exactly: it sets this fallback string and then the + # async fs.readFile callback does `data = data` (a no-op, because the + # callback parameter shadows the outer variable), so backup.txt's contents + # are never actually used -- data always stays this literal. data = 'No backup.txt file found' - try: - with open('backup.txt', 'r') as f: - data = f.read() - except OSError: - pass else: data = file_bytes.decode('ascii', errors='replace')