sandbox/ivideon/puml/Crowd/Tevian/crowd_tevian_now_simplified.puml
2026-02-13 17:36:23 +03:00

249 lines
8.8 KiB
Text
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

@startuml Crowd Node - Request Processing Flow
!define COMPONENT_BG_COLOR #E3F2FD
!define API_BG_COLOR #FFF3E0
!define STORAGE_BG_COLOR #F3E5F5
title Crowd Node: Процесс обработки запроса на анализ изображения
participant "Scheduler" as Scheduler
participant "Redis Queue" as RedisQueue #FFCCCC
participant "tasks.py\n(Task Listener)" as TaskListener COMPONENT_BG_COLOR
participant "analyzer.py\n(Main Analyzer)" as Analyzer COMPONENT_BG_COLOR
participant "frames.py\n(Frame Puller)" as FramePuller COMPONENT_BG_COLOR
participant "LivePreview service" as LivePreview API_BG_COLOR
participant "TevianHeadsDetector\n(detectors/tevian.py)" as Detector COMPONENT_BG_COLOR
participant "Redis Cache" as RedisCache #FFCCCC
participant "Tevian Cloud API\n(ext_api/tevian_api.py)" as TevianAPI API_BG_COLOR
participant "S3 Storage" as S3 STORAGE_BG_COLOR
participant "Central (crowd backend)" as Central API_BG_COLOR
== Получение задания ==
Scheduler -> RedisQueue: push task\n{cmd: "analyze_crowd",\nparams: {uin, camera_id, zones, ...}}
activate RedisQueue
TaskListener -> RedisQueue: pull_task()
activate TaskListener
RedisQueue --> TaskListener: task data
deactivate RedisQueue
TaskListener -> Analyzer: analyze_crowd(uin, camera_id, zones, **options)
activate Analyzer
== Получение кадра ==
Analyzer -> FramePuller: pull(uin, camera)
activate FramePuller
FramePuller -> LivePreview: GET /internal/preview
activate LivePreview
LivePreview --> FramePuller: JPEG image bytes
deactivate LivePreview
FramePuller -> FramePuller: Frame(content)\n- создает PIL Image\n- генерирует image_id
FramePuller --> Analyzer: Frame object
deactivate FramePuller
== Запуск детектора ==
Analyzer -> Analyzer: _run_detectors(uin, camera, frame, zones)
Analyzer -> Detector: request(uin, camera, frame, zones)
activate Detector
== Подготовка Tevian (prepare) ==
Detector -> Detector: prepare(cam_name, zones)
note right
Подготовка включает:
1. Создание/получение камеры
2. Синхронизацию очередей (зон)
3. Обновление параметров зон
end note
Detector -> Detector: _get_or_create_camera(cam_name)
Detector -> RedisCache: get_camera(cam_name)
activate RedisCache
RedisCache --> Detector: TCamera or None
deactivate RedisCache
alt Камеры нет в кеше
Detector -> TevianAPI: TCamera.get_all()
activate TevianAPI
TevianAPI -> TevianAPI: _refresh_token() if needed
TevianAPI -> "Tevian Cloud": GET /api/cameras
"Tevian Cloud" --> TevianAPI: список камер [{id, name, ...}]
TevianAPI --> Detector: [TCamera, ...]
deactivate TevianAPI
Detector -> RedisCache: set_camera(cam) для каждой
alt Камера все еще не найдена
Detector -> TevianAPI: TCamera.create(cam_name)
activate TevianAPI
TevianAPI -> "Tevian Cloud": POST /api/cameras\n{name, rtsp, frequency_plan_id, ...}
"Tevian Cloud" --> TevianAPI: {id, name, status, ...}
TevianAPI --> Detector: TCamera
deactivate TevianAPI
Detector -> RedisCache: set_camera(cam)
end
end
== Синхронизация очередей (зон) ==
Detector -> Detector: _get_camera_queues(cam)
Detector -> RedisCache: get_queue(q_id)\nдля каждого queues_ids камеры
RedisCache --> Detector: TQueue objects
loop Для каждой зоны из запроса
Detector -> Detector: Конвертировать координаты\nв относительные (0..1)
alt Очередь не найдена
Detector -> TevianAPI: TQueue.create(cam_id, zone_id, polygon, min_head_size)
activate TevianAPI
TevianAPI -> "Tevian Cloud": POST /api/queues\n{name, camera_id, roi_polygon_relative, ...}
"Tevian Cloud" --> TevianAPI: {id, name, camera_id, ...}
TevianAPI --> Detector: TQueue
deactivate TevianAPI
Detector -> RedisCache: set_queue(queue)
else Параметры зоны изменились
Detector -> TevianAPI: queue.save()
activate TevianAPI
TevianAPI -> "Tevian Cloud": POST /api/queues/{id}\n{roi_polygon_relative, ...}
"Tevian Cloud" --> TevianAPI: updated queue
TevianAPI --> Detector: success
deactivate TevianAPI
Detector -> RedisCache: set_queue(queue)
end
end
loop Для старых очередей (не в списке зон)
Detector -> TevianAPI: TQueue.delete_by_id(queue_id)
activate TevianAPI
TevianAPI -> "Tevian Cloud": DELETE /api/queues/{id}
"Tevian Cloud" --> TevianAPI: success
TevianAPI --> Detector: success
deactivate TevianAPI
Detector -> RedisCache: delete_queue(queue_id)
end
Detector -> TevianAPI: cam.refresh()
note right
Обновляем состояние камеры
после изменения очередей
end note
activate TevianAPI
TevianAPI -> "Tevian Cloud": GET /api/cameras/{id}
"Tevian Cloud" --> TevianAPI: {status, is_accepting_snapshots, ...}
TevianAPI --> Detector: updated TCamera
deactivate TevianAPI
Detector -> RedisCache: set_camera(cam)
== Отправка снапшота и получение результатов ==
Detector -> Detector: Проверка rate limiting\n(FORCED_WAIT_PERIOD)
note right
Избегаем HTTP 429: Too Many Requests
Ждем если запрос слишком частый
end note
alt Слишком частые запросы
Detector -> Detector: asyncio.sleep(wait_for)
end
Detector -> TevianAPI: cam.send_snapshot(frame.data)
activate TevianAPI
TevianAPI -> "Tevian Cloud": POST /api/cameras/{id}/snapshots\nContent-Type: image/jpeg\nbody: <JPEG bytes>
"Tevian Cloud" --> TevianAPI: {snapshot_accepted_at: <timestamp>}
TevianAPI --> Detector: timestamp
deactivate TevianAPI
Detector -> Detector: asyncio.sleep(TEVIAN_RECOGNITION_DELAY)\n(рекомендуется 12 сек)
Detector -> TevianAPI: TRecognition.get_many(queues_ids, timestamp)
activate TevianAPI
loop Polling до получения результатов или timeout
TevianAPI -> "Tevian Cloud": GET /api/recognitions?\nqueues_ids={ids}&utc_timestamp={ts}
"Tevian Cloud" --> TevianAPI: [recognitions...]
alt Результатов меньше чем очередей
TevianAPI -> TevianAPI: await gen.sleep(2)\nи повторить
else Все результаты получены
TevianAPI -> TevianAPI: break
end
end
TevianAPI -> TevianAPI: Фильтровать detections:\nоставить только\nfiltered_status == 'passed_filters'
TevianAPI --> Detector: [TRecognition, ...]
deactivate TevianAPI
== Форматирование результата ==
loop Для каждого recognition
Detector -> RedisCache: get_queue(rec.queue_id)
RedisCache --> Detector: TQueue
Detector -> Detector: Форматировать objects:\n[{x, y, w, h}, ...]\nиз bbox данных
Detector -> Detector: result[queue.name] = {\n 'count': len(objects),\n 'objects': objects\n}
end
Detector --> Analyzer: result dict\n{zone_id: {count, objects}, ...}
deactivate Detector
Analyzer -> Analyzer: _build_zones_info(zones, detected_values)
note right
Объединяет данные зон с результатами
детекторов, определяет length_by_ai
end note
Analyzer -> Analyzer: _get_triggered_zones(zones_info, timestamp)
note right
Определяет зоны для подсветки
на основе trigger_at и trigger_type
end note
== Сохранение результата ==
Analyzer -> Analyzer: frame.draw_zones(triggered_zones)
note right
Рисует полигоны триггерных зон
на изображении с прозрачностью
end note
Analyzer -> Analyzer: resize_image(image, 640)
Analyzer -> S3: storage.upload_fileobj(\n image, bucket, key, ...)
activate S3
S3 --> Analyzer: ObjRef
deactivate S3
Analyzer -> S3: storage.generate_presigned_url(obj_ref)
activate S3
S3 --> Analyzer: presigned_url
deactivate S3
Analyzer -> Analyzer: Удалить query params\nиз URL (сделать публичным)
== Отправка результата в Central ==
Analyzer -> Central: central.send('new_measurement', {\n timestamp,\n camera_id,\n measurement_id,\n image: zones_url,\n timings,\n errors,\n zones: zones_info\n})
activate Central
note right
Отправка через aio_broker
в очередь 'overmind:input'
с командой 'new_measurement'
end note
Central --> Analyzer: (async, no wait)
deactivate Central
Analyzer -> Analyzer: Обновить БД zones_db:\ndetected_at = time.time()
Analyzer --> TaskListener: complete
deactivate Analyzer
TaskListener -> TaskListener: Ожидать следующую задачу
deactivate TaskListener
@enduml