252 lines
9 KiB
Text
252 lines
9 KiB
Text
@startuml Crowd Node - Request Processing Flow
|
||
|
||
!define COMPONENT_BG_COLOR #E3F2FD
|
||
!define API_BG_COLOR #FFF3E0
|
||
!define STORAGE_BG_COLOR #F3E5F5
|
||
|
||
title Crowd Node: Процесс обработки запроса на анализ изображения
|
||
|
||
actor "Scheduler/External System" as Scheduler
|
||
participant "Redis Queue" as RedisQueue #FFCCCC
|
||
participant "tasks.py\n(Task Listener)" as TaskListener COMPONENT_BG_COLOR
|
||
participant "analyzer.py\n(Main Analyzer)" as Analyzer COMPONENT_BG_COLOR
|
||
participant "frames.py\n(Frame Puller)" as FramePuller COMPONENT_BG_COLOR
|
||
participant "LivePreview Service" as LivePreview API_BG_COLOR
|
||
participant "TevianHeadsDetector\n(detectors/tevian.py)" as Detector COMPONENT_BG_COLOR
|
||
participant "Redis Cache" as RedisCache #FFCCCC
|
||
participant "Tevian Cloud API\n(ext_api/tevian_api.py)" as TevianAPI API_BG_COLOR
|
||
participant "S3 Storage" as S3 STORAGE_BG_COLOR
|
||
participant "Central (crowd backend)" as Central API_BG_COLOR
|
||
|
||
== Получение задания ==
|
||
|
||
Scheduler -> RedisQueue: push task\n{cmd: "analyze_crowd",\nparams: {uin, camera_id, zones, ...}}
|
||
activate RedisQueue
|
||
|
||
TaskListener -> RedisQueue: pull_task()
|
||
activate TaskListener
|
||
RedisQueue --> TaskListener: task data
|
||
deactivate RedisQueue
|
||
|
||
TaskListener -> TaskListener: dispatch to handler\nHANDLERS['analyze_crowd']
|
||
TaskListener -> Analyzer: analyze_crowd(uin, camera_id, zones, **options)
|
||
activate Analyzer
|
||
|
||
== Получение кадра ==
|
||
|
||
Analyzer -> FramePuller: pull(uin, camera)
|
||
activate FramePuller
|
||
FramePuller -> LivePreview: GET /internal/preview?\nu={uin}&camera={camera}&q=2
|
||
activate LivePreview
|
||
LivePreview --> FramePuller: JPEG image bytes
|
||
deactivate LivePreview
|
||
FramePuller -> FramePuller: Frame(content)\n- создает PIL Image\n- генерирует image_id
|
||
FramePuller --> Analyzer: Frame object
|
||
deactivate FramePuller
|
||
|
||
Analyzer -> Analyzer: rotate(rotation_angle)\nесли нужно
|
||
|
||
== Запуск детекторов ==
|
||
|
||
Analyzer -> Analyzer: _run_detectors(uin, camera, frame, zones)
|
||
Analyzer -> Detector: request(uin, camera, frame, zones)
|
||
activate Detector
|
||
|
||
== Подготовка Tevian (prepare) ==
|
||
|
||
Detector -> Detector: prepare(cam_name, zones)
|
||
note right
|
||
Подготовка включает:
|
||
1. Создание/получение камеры
|
||
2. Синхронизацию очередей (зон)
|
||
3. Обновление параметров зон
|
||
end note
|
||
|
||
Detector -> Detector: _get_or_create_camera(cam_name)
|
||
|
||
Detector -> RedisCache: get_camera(cam_name)
|
||
activate RedisCache
|
||
RedisCache --> Detector: TCamera или None
|
||
deactivate RedisCache
|
||
|
||
alt Камеры нет в кеше
|
||
Detector -> TevianAPI: TCamera.get_all()
|
||
activate TevianAPI
|
||
TevianAPI -> TevianAPI: _refresh_token() if needed
|
||
TevianAPI -> "Tevian Cloud": GET /api/cameras
|
||
"Tevian Cloud" --> TevianAPI: список камер [{id, name, ...}]
|
||
TevianAPI --> Detector: [TCamera, ...]
|
||
deactivate TevianAPI
|
||
|
||
Detector -> RedisCache: set_camera(cam) для каждой
|
||
|
||
alt Камера все еще не найдена
|
||
Detector -> TevianAPI: TCamera.create(cam_name)
|
||
activate TevianAPI
|
||
TevianAPI -> "Tevian Cloud": POST /api/cameras\n{name, rtsp, frequency_plan_id, ...}
|
||
"Tevian Cloud" --> TevianAPI: {id, name, status, ...}
|
||
TevianAPI --> Detector: TCamera
|
||
deactivate TevianAPI
|
||
Detector -> RedisCache: set_camera(cam)
|
||
end
|
||
end
|
||
|
||
== Синхронизация очередей (зон) ==
|
||
|
||
Detector -> Detector: _get_camera_queues(cam)
|
||
Detector -> RedisCache: get_queue(q_id)\nдля каждого queues_ids камеры
|
||
RedisCache --> Detector: TQueue objects
|
||
|
||
loop Для каждой зоны из запроса
|
||
Detector -> Detector: Конвертировать координаты\nв относительные (0..1)
|
||
|
||
alt Очередь не найдена
|
||
Detector -> TevianAPI: TQueue.create(cam_id, zone_id, polygon, min_head_size)
|
||
activate TevianAPI
|
||
TevianAPI -> "Tevian Cloud": POST /api/queues\n{name, camera_id, roi_polygon_relative, ...}
|
||
"Tevian Cloud" --> TevianAPI: {id, name, camera_id, ...}
|
||
TevianAPI --> Detector: TQueue
|
||
deactivate TevianAPI
|
||
Detector -> RedisCache: set_queue(queue)
|
||
else Параметры зоны изменились
|
||
Detector -> TevianAPI: queue.save()
|
||
activate TevianAPI
|
||
TevianAPI -> "Tevian Cloud": POST /api/queues/{id}\n{roi_polygon_relative, ...}
|
||
"Tevian Cloud" --> TevianAPI: updated queue
|
||
TevianAPI --> Detector: success
|
||
deactivate TevianAPI
|
||
Detector -> RedisCache: set_queue(queue)
|
||
end
|
||
end
|
||
|
||
loop Для старых очередей (не в списке зон)
|
||
Detector -> TevianAPI: TQueue.delete_by_id(queue_id)
|
||
activate TevianAPI
|
||
TevianAPI -> "Tevian Cloud": DELETE /api/queues/{id}
|
||
"Tevian Cloud" --> TevianAPI: success
|
||
TevianAPI --> Detector: success
|
||
deactivate TevianAPI
|
||
Detector -> RedisCache: delete_queue(queue_id)
|
||
end
|
||
|
||
Detector -> TevianAPI: cam.refresh()
|
||
note right
|
||
Обновляем состояние камеры
|
||
после изменения очередей
|
||
end note
|
||
activate TevianAPI
|
||
TevianAPI -> "Tevian Cloud": GET /api/cameras/{id}
|
||
"Tevian Cloud" --> TevianAPI: {status, is_accepting_snapshots, ...}
|
||
TevianAPI --> Detector: updated TCamera
|
||
deactivate TevianAPI
|
||
|
||
Detector -> RedisCache: set_camera(cam)
|
||
|
||
== Отправка снапшота и получение результатов ==
|
||
|
||
Detector -> Detector: Проверка rate limiting\n(FORCED_WAIT_PERIOD)
|
||
note right
|
||
Избегаем HTTP 429: Too Many Requests
|
||
Ждем если запрос слишком частый
|
||
end note
|
||
|
||
alt Слишком частые запросы
|
||
Detector -> Detector: asyncio.sleep(wait_for)
|
||
end
|
||
|
||
Detector -> TevianAPI: cam.send_snapshot(frame.data)
|
||
activate TevianAPI
|
||
TevianAPI -> "Tevian Cloud": POST /api/cameras/{id}/snapshots\nContent-Type: image/jpeg\nbody: <JPEG bytes>
|
||
"Tevian Cloud" --> TevianAPI: {snapshot_accepted_at: <timestamp>}
|
||
TevianAPI --> Detector: timestamp
|
||
deactivate TevianAPI
|
||
|
||
Detector -> Detector: asyncio.sleep(TEVIAN_RECOGNITION_DELAY)\n(рекомендуется 12 сек)
|
||
|
||
Detector -> TevianAPI: TRecognition.get_many(queues_ids, timestamp)
|
||
activate TevianAPI
|
||
|
||
loop Polling до получения результатов или timeout
|
||
TevianAPI -> "Tevian Cloud": GET /api/recognitions?\nqueues_ids={ids}&utc_timestamp={ts}
|
||
"Tevian Cloud" --> TevianAPI: [recognitions...]
|
||
|
||
alt Результатов меньше чем очередей
|
||
TevianAPI -> TevianAPI: await gen.sleep(2)\nи повторить
|
||
else Все результаты получены
|
||
TevianAPI -> TevianAPI: break
|
||
end
|
||
end
|
||
|
||
TevianAPI -> TevianAPI: Фильтровать detections:\nоставить только\nfiltered_status == 'passed_filters'
|
||
TevianAPI --> Detector: [TRecognition, ...]
|
||
deactivate TevianAPI
|
||
|
||
== Форматирование результата ==
|
||
|
||
loop Для каждого recognition
|
||
Detector -> RedisCache: get_queue(rec.queue_id)
|
||
RedisCache --> Detector: TQueue
|
||
|
||
Detector -> Detector: Форматировать objects:\n[{x, y, w, h}, ...]\nиз bbox данных
|
||
|
||
Detector -> Detector: result[queue.name] = {\n 'count': len(objects),\n 'objects': objects\n}
|
||
end
|
||
|
||
Detector --> Analyzer: result dict\n{zone_id: {count, objects}, ...}
|
||
deactivate Detector
|
||
|
||
Analyzer -> Analyzer: _build_zones_info(zones, detected_values)
|
||
note right
|
||
Объединяет данные зон с результатами
|
||
детекторов, определяет length_by_ai
|
||
end note
|
||
|
||
Analyzer -> Analyzer: _get_triggered_zones(zones_info, timestamp)
|
||
note right
|
||
Определяет зоны для подсветки
|
||
на основе trigger_at и trigger_type
|
||
end note
|
||
|
||
== Сохранение результата ==
|
||
|
||
Analyzer -> Analyzer: frame.draw_zones(triggered_zones)
|
||
note right
|
||
Рисует полигоны триггерных зон
|
||
на изображении с прозрачностью
|
||
end note
|
||
|
||
Analyzer -> Analyzer: resize_image(image, 640)
|
||
|
||
Analyzer -> S3: storage.upload_fileobj(\n image, bucket, key, ...)
|
||
activate S3
|
||
S3 --> Analyzer: ObjRef
|
||
deactivate S3
|
||
|
||
Analyzer -> S3: storage.generate_presigned_url(obj_ref)
|
||
activate S3
|
||
S3 --> Analyzer: presigned_url
|
||
deactivate S3
|
||
|
||
Analyzer -> Analyzer: Удалить query params\nиз URL (сделать публичным)
|
||
|
||
== Отправка результата в Central ==
|
||
|
||
Analyzer -> Central: central.send('new_measurement', {\n timestamp,\n camera_id,\n measurement_id,\n image: zones_url,\n timings,\n errors,\n zones: zones_info\n})
|
||
activate Central
|
||
note right
|
||
Отправка через aio_broker
|
||
в очередь 'overmind:input'
|
||
с командой 'new_measurement'
|
||
end note
|
||
Central --> Analyzer: (async, no wait)
|
||
deactivate Central
|
||
|
||
Analyzer -> Analyzer: Обновить БД zones_db:\ndetected_at = time.time()
|
||
|
||
Analyzer --> TaskListener: complete
|
||
deactivate Analyzer
|
||
|
||
TaskListener -> TaskListener: Ожидать следующую задачу
|
||
deactivate TaskListener
|
||
|
||
@enduml
|