-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgemini_processor.py
More file actions
475 lines (400 loc) · 17.3 KB
/
gemini_processor.py
File metadata and controls
475 lines (400 loc) · 17.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
"""
Gemini Processor — Batch Photo Analysis via Google Gemini API
Processes photos captured during exploration sprints while Vector charges.
Falls back to local Ollama LLaVA if Gemini is unavailable.
Usage:
processor = GeminiProcessor()
if processor.available:
result = processor.analyze_photo(image_path, metadata)
mission = processor.plan_next_mission(room_knowledge)
"""
import os
import io
import json
import time
import base64
from pathlib import Path
from datetime import datetime
from logger import logger
# Gemini API
try:
import google.generativeai as genai
_GENAI_AVAILABLE = True
except ImportError:
_GENAI_AVAILABLE = False
logger.log("WARNING", "google-generativeai not installed — Gemini disabled")
# Ollama fallback
try:
import ollama
_OLLAMA_AVAILABLE = True
except ImportError:
_OLLAMA_AVAILABLE = False
# PIL for image handling
try:
from PIL import Image
except ImportError:
Image = None
# ── Config ──────────────────────────────────────────────
GEMINI_MODEL = "gemini-2.0-flash"
OLLAMA_VISION_MODEL = "gemma4:e4b" # Updated: Gemma 4 4B vision (replaces llava)
ROOM_ANALYSIS_PROMPT = """You are a robot's spatial analysis system. Analyze this photo taken from a small desk robot's perspective.
Photo context:
- Position index: {position}
- Rotation from home: {rotation}°
- Head tilt: {tilt}
- ToF distance to nearest object: {distance_mm}mm
Respond ONLY in this JSON format (no markdown, no extra text):
{{
"scene_description": "One sentence describing the overall scene",
"objects": ["list", "of", "identifiable", "objects"],
"landmarks": ["distinctive features useful for navigation"],
"spatial_layout": "Description of what's to the left, center, and right",
"surfaces": ["floor type", "wall color/texture", "ceiling if visible"],
"doorways_or_openings": ["any exits, hallways, or room transitions"],
"coverage_notes": "What areas are NOT visible and might need another angle"
}}"""
MISSION_PLAN_PROMPT = """You are a navigation planner for a small desk robot (Anki Vector).
The robot has completed {num_sprints} exploration sprint(s) and gathered the following room knowledge:
{knowledge_summary}
Coverage gaps identified:
{gaps}
Based on this data, plan the robot's next exploration sprint. The robot:
- Starts from its charger
- Can drive straight, turn in place, and take photos
- Has ~12 minutes of battery per sprint
- Should prioritize unexplored areas
Respond ONLY in this JSON format:
{{
"priority_direction": "The compass direction or relative direction to explore first",
"reason": "Why this area needs exploration",
"strategy": "Step-by-step movement plan (up to 8 positions)",
"expected_discoveries": "What the robot might find based on adjacent areas",
"estimated_positions": 5
}}"""
class GeminiProcessor:
"""Batch photo analysis via Gemini API. Cloud-only — zero local RAM cost."""
# Fallback locations to find the API key
_KEY_FILE = Path(__file__).parent / ".gemini_key"
_START_SCRIPT = Path(__file__).parent / "start_dashboard.sh"
def __init__(self):
self.api_key = self._resolve_api_key()
self.model = None
self.available = False
self.using_gemini = False
self._stats = {
"photos_processed": 0,
"total_time_sec": 0,
"errors": 0,
"source": "none",
}
# Try Gemini (cloud — no RAM cost)
if _GENAI_AVAILABLE and self.api_key:
try:
genai.configure(api_key=self.api_key)
self.model = genai.GenerativeModel(GEMINI_MODEL)
# Quick validation
self.model.count_tokens("test")
self.available = True
self.using_gemini = True
self._stats["source"] = "gemini"
logger.log("GEMINI", f"✅ Gemini API ready (model: {GEMINI_MODEL})")
except Exception as e:
logger.log("WARNING", f"Gemini init failed: {e}")
self.model = None
elif not _GENAI_AVAILABLE:
logger.log("WARNING", "google-generativeai package not installed")
elif not self.api_key:
logger.log("WARNING", "GEMINI_API_KEY not found in env, .gemini_key, or start_dashboard.sh")
# Gemma 4:e4b fallback — 4B vision model via local Ollama (M4 24GB can handle it)
if not self.available and _OLLAMA_AVAILABLE:
try:
import ollama as _ollama_check
models = [m["model"] for m in _ollama_check.list().get("models", [])]
if any("gemma4" in m for m in models):
self.available = True
self.using_gemini = False
self._stats["source"] = "ollama_gemma4"
logger.log("GEMINI", f"✅ Ollama Gemma 4 ready (model: {OLLAMA_VISION_MODEL})")
except Exception as e:
logger.log("WARNING", f"Ollama Gemma 4 check failed: {e}")
if not self.available:
logger.log("WARNING", "Vision AI unavailable — set GEMINI_API_KEY or pull gemma4:e4b")
def _resolve_api_key(self) -> str:
"""Find API key from multiple sources (env var → file → script)."""
# 1. Environment variable (normal path)
key = os.environ.get("GEMINI_API_KEY", "").strip()
if key:
return key
# 2. Dedicated key file
if self._KEY_FILE.exists():
key = self._KEY_FILE.read_text().strip()
if key:
logger.log("GEMINI", "🔑 Loaded API key from .gemini_key file")
return key
# 3. Parse from start_dashboard.sh (emergency fallback)
if self._START_SCRIPT.exists():
try:
import re
text = self._START_SCRIPT.read_text()
match = re.search(r'GEMINI_API_KEY[="\s]+(AIza[A-Za-z0-9_-]+)', text)
if match:
key = match.group(1)
logger.log("GEMINI", "🔑 Loaded API key from start_dashboard.sh")
return key
except Exception:
pass
return ""
# ── Single Photo Analysis ─────────────────────────────
def analyze_photo(self, image_path: str, metadata: dict = None) -> dict:
"""Analyze a single photo with Gemini or Ollama fallback.
Args:
image_path: path to JPEG file
metadata: dict with position, rotation_deg, head_tilt, distance_mm
Returns:
dict with scene_description, objects, landmarks, spatial_layout, etc.
"""
if not self.available or Image is None:
return self._empty_result("AI not available")
meta = metadata or {}
prompt = ROOM_ANALYSIS_PROMPT.format(
position=meta.get("position", "?"),
rotation=meta.get("rotation_deg", "?"),
tilt=meta.get("head_tilt", "?"),
distance_mm=meta.get("distance_mm", "?"),
)
start = time.time()
try:
if self.using_gemini:
result = self._gemini_analyze(image_path, prompt)
else:
result = self._ollama_analyze(image_path, prompt)
elapsed = time.time() - start
self._stats["photos_processed"] += 1
self._stats["total_time_sec"] += elapsed
logger.log("GEMINI",
f"📸 Analyzed {Path(image_path).name} in {elapsed:.1f}s "
f"→ {len(result.get('objects', []))} objects"
)
return result
except Exception as e:
self._stats["errors"] += 1
logger.log("ERROR", f"Photo analysis failed: {e}")
return self._empty_result(str(e))
def _gemini_analyze(self, image_path: str, prompt: str) -> dict:
"""Analyze with Gemini API."""
img = Image.open(image_path)
# Resize if too large (Gemini has limits)
max_dim = 1024
if max(img.size) > max_dim:
img.thumbnail((max_dim, max_dim))
response = self.model.generate_content([prompt, img])
return self._parse_json_response(response.text)
def _ollama_analyze(self, image_path: str, prompt: str) -> dict:
"""Analyze with local Ollama LLaVA."""
# Convert to base64
with open(image_path, "rb") as f:
img_b64 = base64.b64encode(f.read()).decode()
response = ollama.chat(
model=OLLAMA_VISION_MODEL,
messages=[{
"role": "user",
"content": prompt,
"images": [img_b64],
}],
)
return self._parse_json_response(response["message"]["content"])
# ── Batch Processing ──────────────────────────────────
def process_batch(self, session_dir: str, progress_callback=None) -> dict:
"""Process all photos in a scan session directory.
Args:
session_dir: path to session folder with .jpg + .json files
progress_callback: optional fn(processed, total, result) called per photo
Returns:
dict with all_objects, photo_analyses, landmarks, coverage_gaps
"""
session = Path(session_dir)
photo_files = sorted(session.glob("*.jpg"))
if not photo_files:
return {"error": "No photos found", "all_objects": [], "photo_analyses": []}
all_objects = set()
all_landmarks = set()
analyses = []
coverage = {} # position → set of rotations covered
for i, photo_path in enumerate(photo_files):
# Load metadata
meta_path = photo_path.with_suffix(".json")
meta = {}
if meta_path.exists():
try:
meta = json.loads(meta_path.read_text())
except Exception:
pass
# Analyze
result = self.analyze_photo(str(photo_path), meta)
# Collect objects and landmarks
objects = result.get("objects", [])
landmarks = result.get("landmarks", [])
all_objects.update(obj.lower().strip() for obj in objects)
all_landmarks.update(lm.lower().strip() for lm in landmarks)
# Track coverage
pos = meta.get("position", 0)
rot = meta.get("rotation_deg", 0)
if pos not in coverage:
coverage[pos] = set()
coverage[pos].add(rot)
# Store analysis
analysis = {
"photo": photo_path.name,
"position": pos,
"rotation": rot,
"tilt": meta.get("head_tilt", "level"),
"distance_mm": meta.get("distance_mm", 0),
**result,
}
analyses.append(analysis)
# Update metadata file with analysis
meta.update({
"scene_description": result.get("scene_description", ""),
"objects": objects,
"landmarks": landmarks,
"spatial_layout": result.get("spatial_layout", ""),
})
try:
meta_path.write_text(json.dumps(meta, indent=2))
except Exception:
pass
# Progress callback
if progress_callback:
progress_callback(i + 1, len(photo_files), result)
# Identify coverage gaps
gaps = self._find_coverage_gaps(coverage)
return {
"session": session.name,
"timestamp": datetime.now().isoformat(),
"total_photos": len(photo_files),
"all_objects": sorted(all_objects),
"object_count": len(all_objects),
"all_landmarks": sorted(all_landmarks),
"photo_analyses": analyses,
"coverage": {str(k): sorted(v) for k, v in coverage.items()},
"coverage_gaps": gaps,
"processing_source": "gemini" if self.using_gemini else "ollama",
"avg_time_per_photo": (
self._stats["total_time_sec"] / max(1, self._stats["photos_processed"])
),
}
def _find_coverage_gaps(self, coverage: dict) -> list:
"""Identify which angles weren't captured at each position."""
full_rotations = {0, 90, 180, 270}
gaps = []
for pos, rotations in coverage.items():
missing = full_rotations - rotations
if missing:
gaps.append({
"position": pos,
"missing_rotations": sorted(missing),
"coverage_pct": int(len(rotations) / len(full_rotations) * 100),
})
return gaps
# ── Mission Planning ──────────────────────────────────
def plan_next_mission(self, room_knowledge: dict) -> dict:
"""Ask Gemini to plan the next exploration sprint based on current knowledge.
Args:
room_knowledge: the room_knowledge.json content
Returns:
dict with priority_direction, reason, strategy, estimated_positions
"""
if not self.available:
return self._default_mission_plan()
# Build summary
objects = room_knowledge.get("all_objects", [])
gaps = room_knowledge.get("coverage_gaps", [])
num_sprints = room_knowledge.get("sprint_count", 1)
knowledge_summary = f"Objects found: {', '.join(objects[:30])}\n"
knowledge_summary += f"Positions explored: {room_knowledge.get('total_positions', 0)}\n"
knowledge_summary += f"Photos taken: {room_knowledge.get('total_photos', 0)}\n"
# Add scene descriptions
analyses = room_knowledge.get("photo_analyses", [])
for a in analyses[:10]:
knowledge_summary += f"- Pos {a.get('position')}, {a.get('rotation')}°: {a.get('scene_description', 'N/A')}\n"
gaps_text = json.dumps(gaps, indent=2) if gaps else "No gaps identified"
prompt = MISSION_PLAN_PROMPT.format(
num_sprints=num_sprints,
knowledge_summary=knowledge_summary,
gaps=gaps_text,
)
try:
if self.using_gemini:
response = self.model.generate_content(prompt)
return self._parse_json_response(response.text)
else:
response = ollama.chat(
model="llama3.1",
messages=[{"role": "user", "content": prompt}],
)
return self._parse_json_response(response["message"]["content"])
except Exception as e:
logger.log("ERROR", f"Mission planning failed: {e}")
return self._default_mission_plan()
# ── Helpers ───────────────────────────────────────────
def _parse_json_response(self, text: str) -> dict:
"""Extract JSON from AI response text."""
# Try direct parse
text = text.strip()
# Strip markdown code fences if present
if text.startswith("```"):
lines = text.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
text = "\n".join(lines)
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try to find JSON block
import re
match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', text, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
# Fallback: return as description
return {
"scene_description": text[:500],
"objects": [],
"landmarks": [],
"spatial_layout": "",
"surfaces": [],
"doorways_or_openings": [],
"coverage_notes": "",
"_raw": True,
}
def _empty_result(self, reason: str = "") -> dict:
return {
"scene_description": f"Analysis unavailable: {reason}",
"objects": [],
"landmarks": [],
"spatial_layout": "",
"surfaces": [],
"doorways_or_openings": [],
"coverage_notes": "",
"error": reason,
}
def _default_mission_plan(self) -> dict:
return {
"priority_direction": "forward",
"reason": "Default exploration — no AI planning available",
"strategy": "Drive forward, turn 90° at each position, capture 360°",
"estimated_positions": 5,
}
def get_stats(self) -> dict:
"""Get processing statistics for dashboard."""
return {
**self._stats,
"using_gemini": self.using_gemini,
"model": GEMINI_MODEL if self.using_gemini else OLLAMA_VISION_MODEL,
"available": self.available,
"avg_time_per_photo": (
round(self._stats["total_time_sec"] / max(1, self._stats["photos_processed"]), 1)
),
}