@startuml Crowd Node - Request Processing Flow !define COMPONENT_BG_COLOR #E3F2FD !define API_BG_COLOR #FFF3E0 !define STORAGE_BG_COLOR #F3E5F5 title Crowd Node: Процесс обработки запроса на анализ изображения participant "Scheduler" as Scheduler participant "Redis Queue" as RedisQueue #FFCCCC participant "tasks.py\n(Task Listener)" as TaskListener COMPONENT_BG_COLOR participant "analyzer.py\n(Main Analyzer)" as Analyzer COMPONENT_BG_COLOR participant "frames.py\n(Frame Puller)" as FramePuller COMPONENT_BG_COLOR participant "LivePreview service" as LivePreview API_BG_COLOR participant "TevianHeadsDetector\n(detectors/tevian.py)" as Detector COMPONENT_BG_COLOR participant "Redis Cache" as RedisCache #FFCCCC participant "Tevian Cloud API\n(ext_api/tevian_api.py)" as TevianAPI API_BG_COLOR participant "S3 Storage" as S3 STORAGE_BG_COLOR participant "Central (crowd backend)" as Central API_BG_COLOR == Получение задания == Scheduler -> RedisQueue: push task\n{cmd: "analyze_crowd",\nparams: {uin, camera_id, zones, ...}} activate RedisQueue TaskListener -> RedisQueue: pull_task() activate TaskListener RedisQueue --> TaskListener: task data deactivate RedisQueue TaskListener -> Analyzer: analyze_crowd(uin, camera_id, zones, **options) activate Analyzer == Получение кадра == Analyzer -> FramePuller: pull(uin, camera) activate FramePuller FramePuller -> LivePreview: GET /internal/preview activate LivePreview LivePreview --> FramePuller: JPEG image bytes deactivate LivePreview FramePuller -> FramePuller: Frame(content)\n- создает PIL Image\n- генерирует image_id FramePuller --> Analyzer: Frame object deactivate FramePuller == Запуск детектора == Analyzer -> Analyzer: _run_detectors(uin, camera, frame, zones) Analyzer -> Detector: request(uin, camera, frame, zones) activate Detector == Подготовка Tevian (prepare) == Detector -> Detector: prepare(cam_name, zones) note right Подготовка включает: 1. Создание/получение камеры 2. Синхронизацию очередей (зон) 3. Обновление параметров зон end note Detector -> Detector: _get_or_create_camera(cam_name) Detector -> RedisCache: get_camera(cam_name) activate RedisCache RedisCache --> Detector: TCamera or None deactivate RedisCache alt Камеры нет в кеше Detector -> TevianAPI: TCamera.get_all() activate TevianAPI TevianAPI -> TevianAPI: _refresh_token() if needed TevianAPI -> "Tevian Cloud": GET /api/cameras "Tevian Cloud" --> TevianAPI: список камер [{id, name, ...}] TevianAPI --> Detector: [TCamera, ...] deactivate TevianAPI Detector -> RedisCache: set_camera(cam) для каждой alt Камера все еще не найдена Detector -> TevianAPI: TCamera.create(cam_name) activate TevianAPI TevianAPI -> "Tevian Cloud": POST /api/cameras\n{name, rtsp, frequency_plan_id, ...} "Tevian Cloud" --> TevianAPI: {id, name, status, ...} TevianAPI --> Detector: TCamera deactivate TevianAPI Detector -> RedisCache: set_camera(cam) end end == Синхронизация очередей (зон) == Detector -> Detector: _get_camera_queues(cam) Detector -> RedisCache: get_queue(q_id)\nдля каждого queues_ids камеры RedisCache --> Detector: TQueue objects loop Для каждой зоны из запроса Detector -> Detector: Конвертировать координаты\nв относительные (0..1) alt Очередь не найдена Detector -> TevianAPI: TQueue.create(cam_id, zone_id, polygon, min_head_size) activate TevianAPI TevianAPI -> "Tevian Cloud": POST /api/queues\n{name, camera_id, roi_polygon_relative, ...} "Tevian Cloud" --> TevianAPI: {id, name, camera_id, ...} TevianAPI --> Detector: TQueue deactivate TevianAPI Detector -> RedisCache: set_queue(queue) else Параметры зоны изменились Detector -> TevianAPI: queue.save() activate TevianAPI TevianAPI -> "Tevian Cloud": POST /api/queues/{id}\n{roi_polygon_relative, ...} "Tevian Cloud" --> TevianAPI: updated queue TevianAPI --> Detector: success deactivate TevianAPI Detector -> RedisCache: set_queue(queue) end end loop Для старых очередей (не в списке зон) Detector -> TevianAPI: TQueue.delete_by_id(queue_id) activate TevianAPI TevianAPI -> "Tevian Cloud": DELETE /api/queues/{id} "Tevian Cloud" --> TevianAPI: success TevianAPI --> Detector: success deactivate TevianAPI Detector -> RedisCache: delete_queue(queue_id) end Detector -> TevianAPI: cam.refresh() note right Обновляем состояние камеры после изменения очередей end note activate TevianAPI TevianAPI -> "Tevian Cloud": GET /api/cameras/{id} "Tevian Cloud" --> TevianAPI: {status, is_accepting_snapshots, ...} TevianAPI --> Detector: updated TCamera deactivate TevianAPI Detector -> RedisCache: set_camera(cam) == Отправка снапшота и получение результатов == Detector -> Detector: Проверка rate limiting\n(FORCED_WAIT_PERIOD) note right Избегаем HTTP 429: Too Many Requests Ждем если запрос слишком частый end note alt Слишком частые запросы Detector -> Detector: asyncio.sleep(wait_for) end Detector -> TevianAPI: cam.send_snapshot(frame.data) activate TevianAPI TevianAPI -> "Tevian Cloud": POST /api/cameras/{id}/snapshots\nContent-Type: image/jpeg\nbody: "Tevian Cloud" --> TevianAPI: {snapshot_accepted_at: } TevianAPI --> Detector: timestamp deactivate TevianAPI Detector -> Detector: asyncio.sleep(TEVIAN_RECOGNITION_DELAY)\n(рекомендуется 12 сек) Detector -> TevianAPI: TRecognition.get_many(queues_ids, timestamp) activate TevianAPI loop Polling до получения результатов или timeout TevianAPI -> "Tevian Cloud": GET /api/recognitions?\nqueues_ids={ids}&utc_timestamp={ts} "Tevian Cloud" --> TevianAPI: [recognitions...] alt Результатов меньше чем очередей TevianAPI -> TevianAPI: await gen.sleep(2)\nи повторить else Все результаты получены TevianAPI -> TevianAPI: break end end TevianAPI -> TevianAPI: Фильтровать detections:\nоставить только\nfiltered_status == 'passed_filters' TevianAPI --> Detector: [TRecognition, ...] deactivate TevianAPI == Форматирование результата == loop Для каждого recognition Detector -> RedisCache: get_queue(rec.queue_id) RedisCache --> Detector: TQueue Detector -> Detector: Форматировать objects:\n[{x, y, w, h}, ...]\nиз bbox данных Detector -> Detector: result[queue.name] = {\n 'count': len(objects),\n 'objects': objects\n} end Detector --> Analyzer: result dict\n{zone_id: {count, objects}, ...} deactivate Detector Analyzer -> Analyzer: _build_zones_info(zones, detected_values) note right Объединяет данные зон с результатами детекторов, определяет length_by_ai end note Analyzer -> Analyzer: _get_triggered_zones(zones_info, timestamp) note right Определяет зоны для подсветки на основе trigger_at и trigger_type end note == Сохранение результата == Analyzer -> Analyzer: frame.draw_zones(triggered_zones) note right Рисует полигоны триггерных зон на изображении с прозрачностью end note Analyzer -> Analyzer: resize_image(image, 640) Analyzer -> S3: storage.upload_fileobj(\n image, bucket, key, ...) activate S3 S3 --> Analyzer: ObjRef deactivate S3 Analyzer -> S3: storage.generate_presigned_url(obj_ref) activate S3 S3 --> Analyzer: presigned_url deactivate S3 Analyzer -> Analyzer: Удалить query params\nиз URL (сделать публичным) == Отправка результата в Central == Analyzer -> Central: central.send('new_measurement', {\n timestamp,\n camera_id,\n measurement_id,\n image: zones_url,\n timings,\n errors,\n zones: zones_info\n}) activate Central note right Отправка через aio_broker в очередь 'overmind:input' с командой 'new_measurement' end note Central --> Analyzer: (async, no wait) deactivate Central Analyzer -> Analyzer: Обновить БД zones_db:\ndetected_at = time.time() Analyzer --> TaskListener: complete deactivate Analyzer TaskListener -> TaskListener: Ожидать следующую задачу deactivate TaskListener @enduml