-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecision_maker.py
More file actions
279 lines (229 loc) · 9.53 KB
/
decision_maker.py
File metadata and controls
279 lines (229 loc) · 9.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
from uagents import Agent, Context, Model, Protocol
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import json
import os
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CallbackQueryHandler, ContextTypes
from telegram.ext import ApplicationBuilder, CallbackQueryHandler
# Message models
class AccidentAnalysis(Model):
alert: bool
context: str
class GeoInfo(Model):
patient_address: str # Full address of the patient
emergency_contact: str # Phone or contact info
nearest_hospital: str # Name/address of nearest hospital
estimated_travel_time: int # minutes to reach patient
coordinates: Dict[str, float] # {"latitude": x, "longitude": y}
class UserResponse(Model):
responded: bool
response_time: datetime
class EmergencyNotification(Model):
timestamp: str
location: str
patient_id: str
vital_signs: Dict
context: str
nearest_hospital: str
estimated_arrival: int
class UserNotification(Model):
timestamp: str
alert_type: str
message: str
requires_response: bool
# Create protocols
emergency_protocol = Protocol("emergency-decision", "0.1.0")
# Constants
ALERT_THRESHOLD_TIME = 300 # 5 minutes in seconds
USER_RESPONSE_TIMEOUT = 300 # 5 minutes in seconds
EMERGENCY_SERVICE_ADDRESS = "emergency_service_address_here"
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
USER_CHAT_ID = os.getenv("USER_CHAT_ID")
EMERGENCY_CONTACT_CHAT_ID = os.getenv("EMERGENCY_CONTACT_CHAT_ID")
LLMCOMMUNICATOR_ADDRESS = os.getenv("LLMCOMMUNICATOR_ADDRESS")
# Create decision maker agent
decision_maker = Agent(name="decision-maker",
seed="decision-maker-seed",
)
# Create a global variable for the Telegram application
telegram_app = None
# Add this function to initialize Telegram bot once
async def setup_telegram_bot():
global telegram_app
if telegram_app is None:
telegram_app = ApplicationBuilder().token(TELEGRAM_BOT_TOKEN).build()
# Setup callback handler
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
response = UserResponse(
responded=True,
response_time=datetime.now(),
is_ok=query.data == 'ok'
)
# Update the message to show response received
await query.edit_message_text(
text=f"Response received: {'OK' if query.data == 'ok' else 'Help needed'}"
)
# Get decision maker agent and send response
await telegram_app.bot.send_message(
chat_id=query.message.chat_id,
text="Your response has been recorded."
)
telegram_app.add_handler(CallbackQueryHandler(button_callback))
await telegram_app.initialize()
await telegram_app.start()
# Store for tracking alert states
@decision_maker.on_event("startup")
async def startup(ctx: Context):
ctx.logger.info("Decision maker agent starting up")
ctx.logger.info(f"Agent address: {ctx.agent.address}")
ctx.storage.set('alert_status', {
'first_alert_time': None,
'consecutive_alerts': 0,
'user_notified': False,
'user_responded': False,
})
# Handle incoming accident analysis
@emergency_protocol.on_message(model=AccidentAnalysis)
async def handle_analysis(ctx: Context, sender: str, msg: AccidentAnalysis):
ctx.logger.info(f"Received analysis from {sender}: Alert={msg.alert}")
status = ctx.storage.get('alert_status')
current_time = datetime.now()
if msg.alert:
if status['first_alert_time'] is None:
# Store datetime as ISO format string
status['first_alert_time'] = current_time.isoformat()
status['consecutive_alerts'] += 1
# If we haven't notified user yet and have multiple alerts
if not status['user_notified'] and status['consecutive_alerts'] >= 2:
await notify_user(ctx)
status['user_notified'] = True
# Check if conditions for emergency services are met
if should_contact_emergency(status):
await notify_emergency_services(ctx, msg.context)
else:
# Reset alert status if no alert
status = {
'first_alert_time': None,
'consecutive_alerts': 0,
'user_notified': False,
'user_responded': False,
}
ctx.storage.set('alert_status', status)
# Handle geolocation updates
@emergency_protocol.on_message(model=GeoInfo)
async def handle_traffic(ctx: Context, sender: str, msg: GeoInfo):
# Store the entire geo_info for use in emergency notifications
ctx.storage.set('geo_info', {
'patient_address': msg.patient_address,
'emergency_contact': msg.emergency_contact,
'nearest_hospital': msg.nearest_hospital,
'estimated_travel_time': msg.estimated_travel_time,
'coordinates': msg.coordinates
})
ctx.logger.info(f"Stored new location information: {msg.patient_address}")
# Handle user responses
@emergency_protocol.on_message(model=UserResponse)
async def handle_user_response(ctx: Context, sender: str, msg: UserResponse):
status = ctx.storage.get('alert_status')
status['user_responded'] = msg.responded
ctx.storage.set('alert_status', status)
async def notify_user(ctx: Context):
"""Send Telegram notification to user with response buttons"""
global telegram_app
# Setup bot if not already setup
if telegram_app is None:
await setup_telegram_bot()
keyboard = [
[
InlineKeyboardButton("I'm OK", callback_data='ok'),
InlineKeyboardButton("Need Help", callback_data='help')
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
try:
# Send alert message with buttons
await telegram_app.bot.send_message(
chat_id=USER_CHAT_ID,
text="🚨 Possible emergency detected! Are you OK?",
reply_markup=reply_markup
)
ctx.logger.info("Telegram notification sent with response buttons")
except Exception as e:
ctx.logger.error(f"Failed to send Telegram notification: {e}")
# Add shutdown handler for cleanup
@decision_maker.on_event("shutdown")
async def shutdown(ctx: Context):
global telegram_app
if telegram_app:
await telegram_app.stop()
async def notify_emergency_services(ctx: Context, context: str):
"""Send notification to emergency services and emergency contact"""
geo_info = ctx.storage.get('geo_info') # Store from latest GeoInfo update
if not geo_info:
ctx.logger.error("No location information available!")
return
# Prepare emergency notification
notification = EmergencyNotification(
timestamp=datetime.now().isoformat(),
location=geo_info.patient_address,
patient_id="USER_ID_HERE",
vital_signs={}, # This would be filled with actual vital signs
context=context,
nearest_hospital=geo_info.nearest_hospital,
estimated_arrival=geo_info.estimated_travel_time
)
try:
# Send to emergency services
await ctx.send(EMERGENCY_SERVICE_ADDRESS, notification)
ctx.logger.warning("🚨 Emergency services notified!")
# Send Telegram alert to emergency contact
emergency_message = (
f"🚨 EMERGENCY ALERT!\n\n"
f"Your emergency contact may need assistance.\n"
f"Location: {geo_info.patient_address}\n"
f"Nearest Hospital: {geo_info.nearest_hospital}\n"
f"ETA to location: {geo_info.estimated_travel_time} minutes\n\n"
f"Emergency services have been notified."
)
app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
await app.bot.send_message(
chat_id=EMERGENCY_CONTACT_CHAT_ID, # Need to add this to environment variables
text=emergency_message
)
# Reset alert status after notifications
status = ctx.storage.get('alert_status')
status.update({
'first_alert_time': None,
'consecutive_alerts': 0,
'user_notified': False,
'user_responded': False,
})
ctx.storage.set('alert_status', status)
except Exception as e:
ctx.logger.error(f"Failed to send emergency notifications: {e}")
def should_contact_emergency(status: dict) -> bool:
"""Determine if emergency services should be contacted"""
if not status['first_alert_time']:
return False
try:
# Convert stored ISO format string back to datetime
first_alert_time = datetime.fromisoformat(status['first_alert_time'])
time_difference = (datetime.now() - first_alert_time).total_seconds()
except (TypeError, ValueError):
return False
conditions = [
# Alert has been active for threshold time
time_difference >= ALERT_THRESHOLD_TIME,
# Have multiple consecutive alerts
status['consecutive_alerts'] >= 3,
# User was notified but hasn't responded in time
status['user_notified'] and not status['user_responded']
]
return all(conditions)
# Include protocol
decision_maker.include(emergency_protocol)
if __name__ == "__main__":
decision_maker.run()