ivideon: Add some more schemed for crowd service

This commit is contained in:
t0xa 2025-09-08 08:39:27 +03:00
parent be420adc49
commit d5ecc2d577
6 changed files with 589 additions and 218 deletions

View file

@ -0,0 +1,90 @@
@startuml Crowd Analyzer Flow
title Поток анализа очередей в Crowd Analyzer
actor "Redis Queue" as redis
participant "analyze_crowd()" as entry
participant "_analyze_crowd()" as main
participant "frames.pull()" as frames_mod
participant "_run_detectors()" as detectors
participant "TevianHeadsDetector" as tevian
participant "_build_zones_info()" as zones_builder
participant "_get_triggered_zones()" as trigger_checker
participant "Storage" as storage
participant "central.send()" as central
participant "zones_db" as db
redis -> entry: task(uin, camera_id, zones)
activate entry
entry -> entry: acquire TASKS_LICENCES
entry -> main: _analyze_crowd(uin, camera, zones, server_id)
activate main
main -> main: _filter_frequent_tasks()
alt tasks too frequent
main -> entry: return (skip)
else proceed
main -> frames_mod: pull(uin, camera)
activate frames_mod
frames_mod -> main: Frame object
deactivate frames_mod
opt rotation_angle provided
main -> main: frame.rotate(rotation_angle)
end
main -> detectors: _run_detectors(uin, camera, frame, zones)
activate detectors
detectors -> detectors: check tevian enabled & used
detectors -> tevian: request(uin, camera, frame, zones)
activate tevian
tevian -> detectors: detection results
deactivate tevian
detectors -> main: detected_values, timings, errors
deactivate detectors
main -> zones_builder: _build_zones_info(zones, detected_values)
activate zones_builder
zones_builder -> main: zones_info (with AI results)
deactivate zones_builder
main -> trigger_checker: _get_triggered_zones(zones_info, timestamp)
activate trigger_checker
loop for each zone
trigger_checker -> trigger_checker: check trigger conditions
trigger_checker -> trigger_checker: check schedule
trigger_checker -> trigger_checker: check grace period
end
trigger_checker -> main: triggered_zones
deactivate trigger_checker
main -> main: draw zones on frame
main -> storage: upload_fileobj(image_with_zones)
activate storage
storage -> main: zones_url
deactivate storage
main -> central: send('new_measurement', data)
activate central
central -> main: measurement sent
deactivate central
main -> db: update({'_id': f'{uin}/{camera}'}, detected_at)
activate db
db -> main: updated
deactivate db
end
main -> entry: analysis complete
deactivate main
entry -> entry: release TASKS_LICENCES
entry -> redis: task finished
deactivate entry
@enduml

View file

@ -0,0 +1,138 @@
@startuml Crowd Analyzer Architecture
title Архитектура Crowd Analyzer
package "External Systems" {
[Redis Queue] as redis
[Storage Service] as storage
[Tevian API] as tevian_api
[Camera Server] as camera_server
}
package "Crowd Analyzer" {
[analyze_crowd()] as entry_point
[_analyze_crowd()] as main_logic
[_run_detectors()] as detector_runner
[_get_triggered_zones()] as trigger_logic
[_build_zones_info()] as zones_builder
}
package "Detectors" {
[TevianHeadsDetector] as tevian_detector
}
package "Data Access" {
[frames.pull()] as frame_puller
[central.send()] as central_sender
[services_db] as services_db
[zones_db] as zones_db
}
package "Storage" {
[MongoDB Local] as mongo_local
[MongoDB Crowd] as mongo_crowd
}
' External connections
redis --> entry_point : tasks
camera_server <-- frame_puller : get frames
tevian_api <-- tevian_detector : AI requests
storage <-- main_logic : upload images
' Internal flow
entry_point --> main_logic
main_logic --> detector_runner
main_logic --> zones_builder
main_logic --> trigger_logic
detector_runner --> tevian_detector
' Data access
frame_puller --> camera_server
central_sender --> redis : results
services_db --> mongo_local
zones_db --> mongo_local
' Key relationships
main_logic --> frame_puller : get frames
main_logic --> central_sender : send results
main_logic --> zones_db : update status
detector_runner --> services_db : check config
tevian_detector --> tevian_api : detect heads
note right of entry_point
Entry point:
- Semaphore control
- Error handling
- Metrics tracking
end note
note right of main_logic
Main logic:
- Frame processing
- Zone analysis
- Result storage
- Notification sending
end note
note right of detector_runner
Detector runner:
- AI service calls
- Error handling
- Performance tracking
end note
@enduml
Диаграмма состояний зоны:
@startuml Zone State Diagram
title Состояния зоны в процессе анализа
[*] --> Inactive : zone created
state Inactive {
Inactive : length_by_ai = 0
Inactive : no triggers
}
state Active {
Active : length_by_ai > 0
Active : analyzing people count
}
state Triggered {
Triggered : trigger condition met
Triggered : schedule active
Triggered : not in grace period
}
state GracePeriod {
GracePeriod : trigger condition met
GracePeriod : but in grace period
GracePeriod : suppressing notifications
}
Inactive --> Active : people detected
Active --> Inactive : no people detected
Active --> Triggered : trigger_at threshold reached\nAND schedule active\nAND not in grace period
Active --> GracePeriod : trigger_at threshold reached\nBUT in grace period
Triggered --> Active : trigger condition not met
GracePeriod --> Triggered : grace period expired\nAND trigger still met
GracePeriod --> Active : trigger condition not met
note right of Triggered
Actions:
- Zone highlighted on image
- Notification sent
- Grace period started
end note
note right of GracePeriod
Grace period prevents
spam notifications for
zones that constantly
trigger
end note
@enduml

View file

@ -0,0 +1,193 @@
@startuml CrowdReport Class Relationships
!define ENTITY class
!define INTERFACE interface
package "API Concept Layer" {
INTERFACE CrowdReportInterface {
+id: str
+owner_id: str
+type: str
+name: str
+status: str
+created_at: timestamp
+updated_at: timestamp
+progress: int
+options: dict
+create(user, type, options, name)
}
ENTITY APIObject {
+api_method()
+error_codes()
}
package "Errors" {
ENTITY errors.BadRequest
ENTITY errors.FeatureNotSupported
ENTITY errors.BadParameter
ENTITY errors.MalformedSchedule
}
}
package "Crowd Frontend Implementation" {
ENTITY CrowdReport {
+ID_REGEX: str
+COLLECTION: Collection
+MAX_REPORT_INTERVAL: int = 90
+MAX_WORK_TIME_REPORT_INTERVAL: int = 31
+MAX_REPORT_NAME_LEN: int = 60
+create(user, type, options, name)
+delete()
}
ENTITY InitializerMixin {
+_initialize(data)
+COLLECTION
+underscored_name()
}
ENTITY ReportCustomizer {
+RESTRICTION_COLLECTION
+_type: str
+_required_fields: set
+_allowed_fields: set
+_registry: dict
+for_type(report_type)
+validate(options)
+setup(query, options)
+__init_subclass__()
}
ENTITY CrowdOverallStatsReportCustomizer {
+_type = 'crowd_overall_stats_report'
+_required_fields
+_allowed_fields
+setup(query, options)
}
ENTITY CrowdQueuesStatsReportCustomizer {
+_type = 'crowd_queue_stats_report'
+_required_fields
+_allowed_fields
+setup(query, options)
}
ENTITY CrowdWorkTimeReportCustomizer {
+_type = 'crowd_work_time_report'
+_required_fields
+_allowed_fields
+setup(query, options)
}
ENTITY QueryEnricher {
+query: dict
+start_time: int
+end_time: int
+options: dict
+enrich()
-_populate_time_condition()
-_populate_sources()
-_populate_step()
-_populate_max_queue_size()
-_populate_duration_from()
}
}
package "Utilities & Helpers" {
ENTITY api_helpers {
+InitializerMixin
+get_user_timezone(user)
+schedule_parser(schedule)
+get_default_schedule()
+zone_settings_changed()
}
ENTITY validators {
+validate_list_of_strings()
+validate_schedule()
+validate_time_range()
+validate_limit()
+validate_schedule_values()
}
ENTITY helpers {
+get_user_zones(user_id, zone_ids)
+_get_user_zones()
}
}
package "Database & External" {
ENTITY db {
+reports()
+user_reports
+insert_with_random_id()
}
ENTITY time_tools {
+DAY
+to_seconds()
}
ENTITY pendulum {
+from_timestamp()
}
}
' Inheritance relationships
CrowdReportInterface <|-- CrowdReport
APIObject <|-- CrowdReportInterface
InitializerMixin <|-- CrowdReport
ReportCustomizer <|-- CrowdOverallStatsReportCustomizer
ReportCustomizer <|-- CrowdQueuesStatsReportCustomizer
ReportCustomizer <|-- CrowdWorkTimeReportCustomizer
' Composition/Usage relationships
CrowdReport ..> QueryEnricher : creates
CrowdReport ..> ReportCustomizer : uses
CrowdReport ..> db : persists to
CrowdReport ..> time_tools : uses
CrowdReport ..> pendulum : uses
CrowdReport ..> api_helpers : uses get_user_timezone
CrowdReport ..> helpers : uses get_user_zones
QueryEnricher ..> validators : uses validate_list_of_strings
QueryEnricher ..> helpers : uses get_user_zones
ReportCustomizer ..> validators : uses validate_schedule
CrowdReport --> errors.BadRequest : throws
CrowdReport --> errors.FeatureNotSupported : throws
ReportCustomizer --> errors.BadRequest : throws
ReportCustomizer --> errors.FeatureNotSupported : throws
ReportCustomizer --> errors.BadParameter : throws
validators --> errors.BadRequest : throws
validators --> errors.MalformedSchedule : throws
' Registry pattern
ReportCustomizer : <<registry>>
CrowdOverallStatsReportCustomizer ..> ReportCustomizer : auto-registers
CrowdQueuesStatsReportCustomizer ..> ReportCustomizer : auto-registers
CrowdWorkTimeReportCustomizer ..> ReportCustomizer : auto-registers
note right of ReportCustomizer
Uses __init_subclass__ for
automatic subclass registration
in _registry dictionary
end note
note right of CrowdReport
Main report factory class.
Validates inputs, creates queries,
and persists report tasks to MongoDB.
Maximum intervals: 90 days (regular),
31 days (work time reports)
end note
note right of QueryEnricher
Builds MongoDB queries from
user parameters, handles time
ranges, zones, and filtering
end note
@enduml

View file

@ -0,0 +1,101 @@
@startuml Entity Structures
!define ENTITY class
!define COLLECTION database
title Структура сущностей для Crowd Reports Sources
package "Cameras" {
ENTITY Camera_Entity {
+id: "server_id:camera_index"
+server_id: string
+camera_index: string
+name: string
+owner_id: string
}
COLLECTION servers_collection {
_id: ObjectId (server_id)
owner_id: string
name: string
cameras: {
"0": {name: "Камера 1", active: true},
"1": {name: "Камера 2", active: true}
}
cam_services: {
"0": {crowd: {active: true}},
"1": {crowd: {active: true}}
}
}
}
package "Folders" {
ENTITY Folder_Entity {
+id: ObjectId (folder_id)
+name: string
+owner_id: string
+parents: List[string]
+objects: List[object_info]
}
COLLECTION folders_collection {
_id: ObjectId (folder_id)
owner_id: string
name: string
parents: []
objects: [
{object_type: "camera", object_id: "server:0"},
{object_type: "camera", object_id: "server:1"}
]
}
}
package "Detection Zones" {
ENTITY Zone_Entity {
+id: string (zone_id)
+camera_id: "server_id:camera_index"
+owner_id: string
+name: string
+polygon: List[coordinates]
}
COLLECTION zones_collection {
_id: string (zone_id)
owner_id: string
camera_id: "server_id:camera_index"
name: string
polygon: [...]
deleted: false
}
}
' Relationships
Camera_Entity --> servers_collection : stored in
Folder_Entity --> folders_collection : stored in
Zone_Entity --> zones_collection : stored in
servers_collection --> zones_collection : "camera_id links"
folders_collection --> servers_collection : "contains camera references"
note right of Camera_Entity
ID составляется из:
server_id + ":" + camera_index
Пример:
"507f1f77bcf86cd799439011:0"
end note
note right of Folder_Entity
objects[] может содержать:
- cameras
- другие folders (вложенность)
- другие типы объектов
end note
note right of Zone_Entity
Каждая зона привязана
к конкретной камере через
camera_id поле
end note
@enduml

View file

@ -1,35 +1,3 @@
@startuml _get_all_user_cameras Sequence Diagram
title Последовательность выполнения _get_all_user_cameras
participant "Caller" as caller
participant "_get_all_user_cameras" as main_func
participant "_get_servers" as get_servers
participant "MongoDB" as mongo
participant "ivideon.servers" as servers_collection
note over main_func
Входные параметры:
- user_id: int
- requested_cameras: list[str]
(формат: ["server1:0", "server1:1"])
- service_name: str (например: "crowd")
end note
caller -> main_func: _get_all_user_cameras(user_id, requested_cameras, service_name)
activate main_func
main_func -> main_func: cameras = {}
main_func -> get_servers: _get_servers(requested_cameras)
activate get_servers
note over get_servers
Извлекает server_ids из camera_ids:
["server1:0", "server1:1"]
→ ["server1", "server1"]
→ ["server1"]
end note
@startuml _get_all_user_cameras Activity Diagram
title Алгоритм работы _get_all_user_cameras
@ -139,188 +107,3 @@ note left
end note
@enduml
ggVG
get_servers -> get_servers: requested_server_ids = [camera_id.split(':')[0] \\nfor camera_id in requested_camera_ids]
get_servers -> get_servers: query = {\n 'deleted': {'$ne': True},\n '_id': {'$in': requested_server_ids}\n}
get_servers -> get_servers: projection = {\n '_id': 1, 'owner_id': 1, 'name': 1,\n 'cameras': 1, 'cam_services': 1,\n 'info': 1, 'timezone': 1\n}
get_servers -> mongo: db.ivideon().servers.find(query, projection)
activate mongo
mongo -> servers_collection: find documents
activate servers_collection
servers_collection -> mongo: return server documents
deactivate servers_collection
mongo -> get_servers: list[server_documents]
deactivate mongo
get_servers -> main_func: return servers_list
deactivate get_servers
loop for each server in servers_list
main_func -> main_func: server@startuml _get_all_user_cameras Activity Diagram
title Алгоритм работы _get_all_user_cameras
start
note right
**Входные параметры:**
• user_id: int
• requested_cameras: list[str]
(формат: ["server1:0", "server1:1"])
• service_name: str (например: "crowd")
end note
:Инициализация cameras = {};
:Извлечь server_ids из requested_cameras|
note right
["server1:0", "server1:1"]
→ ["server1"]
end note
:Построить MongoDB запрос:
query = {
'deleted': {'$ne': True},
'_id': {'$in': server_ids}
}|
:Задать проекцию полей:
projection = {
'_id': 1, 'owner_id': 1, 'name': 1,
'cameras': 1, 'cam_services': 1,
'info': 1, 'timezone': 1
}|
:Выполнить запрос к MongoDB:
servers = db.ivideon().servers.find(query, projection)|
partition "Обработка серверов" {
:Взять следующий server;
while (Есть серверы для обработки?) is (да)
:server_id = server['_id'];
:is_shared = server['owner_id'] != user_id;
:server_build_type = server.get('info', {}).get('build_type', '');
:is_server_embedded = server_build_type.endswith('camera');
:cam_services = server.get('cam_services', {});
partition "Обработка камер сервера" {
:Взять следующую камеру (camera_idx, camera_data);
while (Есть камеры на сервере?) is (да)
:service_info = cam_services.get(camera_idx, {})
.get(service_name, {});
if (service_info.get('active', False) == True?) then (да)
:camera_id = f'{server_id}:{camera_idx}';
if (is_server_embedded?) then (да)
:camera_name = server['name'];
else (нет)
:camera_name = camera_data.get('name');
endif
:cameras[camera_id] = {
'id': camera_id,
'owner_id': server['owner_id'],
'server': server_id,
'name': camera_name,
'is_shared': is_shared,
'timezone': server.get('timezone') or
server.get('timezone_default'),
'is_embedded': is_server_embedded
};
else (нет)
note right: Камера пропускается - сервис неактивен
endif
:Взять следующую камеру (camera_idx, camera_data);
endwhile (нет)
}
:Взять следующий server;
endwhile (нет)
}
:return cameras;
stop
note left
**Результат:** dict[camera_id, camera_info]
**Пример:**
{
"507f...439011:0": {
"id": "507f...439011:0",
"owner_id": "user123",
"server": "507f...439011",
"name": "Камера входа",
"is_shared": false,
"timezone": "Europe/Moscow",
"is_embedded": false
}
}
end note
@enduml
ggVG_id = server['_id']
main_func -> main_func: is_shared = server['owner_id'] != user_id
main_func -> main_func: server_build_type = server.get('info', {}).get('build_type', '')
main_func -> main_func: is_server_embedded = server_build_type.endswith('camera')
main_func -> main_func: cam_services = server.get('cam_services', {})
loop for camera_idx, camera_data in server.cameras.items()
main_func -> main_func: service_info = cam_services.get(camera_idx, {})\\n .get(service_name, {})
alt service_info.get('active', False) == True
main_func -> main_func: camera_id = f'{server_id}:{camera_idx}'
alt is_server_embedded == True
main_func -> main_func: camera_name = server['name']
else
main_func -> main_func: camera_name = camera_data.get('name')
end
main_func -> main_func: cameras[camera_id] = {\n 'id': camera_id,\n 'owner_id': server['owner_id'],\n 'server': server_id,\n 'name': camera_name,\n 'is_shared': is_shared,\n 'timezone': server.timezone,\n 'is_embedded': is_server_embedded\n}
note right
Создается полная информация
о камере для возврата
end note
else
note right
Камера пропускается:
сервис неактивен
end note
end
end
end
main_func -> caller: return cameras dict
deactivate main_func
note over caller
Результат: dict[camera_id, camera_info]
где camera_id = "server_id:camera_index"
Пример:
{
"507f...439011:0": {
"id": "507f...439011:0",
"owner_id": "user123",
"server": "507f...439011",
"name": "Камера входа",
"is_shared": false,
"timezone": "Europe/Moscow",
"is_embedded": false
}
}
end note
@enduml

View file

@ -0,0 +1,66 @@
@startuml Crowd Reports System
title Система отчетов Crowd
actor User
participant "CrowdReport API" as api
database "reports.user_reports" as queue_db
participant "Report Builder Worker" as worker
participant "OverallStatsReport" as overall
participant "QueueStatsReport" as queues
participant "WorkTimeReport" as worktime
database "crowd.measurements" as measurements_db
database "crowd.detected_queues" as queues_db
participant "Excel Generator" as excel
participant "Storage (S3)" as storage
== Создание отчета ==
User -> api: POST /crowd_reports?op=CREATE
api -> api: QueryEnricher._populate_sources()
note right: zones.id = ['zone1', 'zone2']
api -> queue_db: insert task (status='in_queue')
api -> User: report created
== Обработка очереди ==
loop continuous
worker -> queue_db: find_one_and_update(status='in_queue')
alt task found
queue_db -> worker: report task
worker -> worker: setup_context(report)
alt overall_stats_report
worker -> overall: make_report()
overall -> measurements_db: aggregate({'zones.id': {'$in': [...]}})
measurements_db -> overall: measurements data
else queue_stats_report
worker -> queues: make_report()
queues -> queues_db: find({'zone_id': {'$in': [...]}})
queues_db -> queues: queues data
else work_time_report
worker -> worktime: make_report()
worktime -> measurements_db: find + schedule analysis
measurements_db -> worktime: filtered data
end
worker -> excel: generate XLSX
excel -> worker: Excel file
worker -> storage: save_to_s3()
worker -> queue_db: update(status='done')
else no tasks
worker -> worker: sleep(SLEEP_INTERVAL)
end
end
== Получение результата ==
User -> api: GET /crowd_reports/{id}
api -> queue_db: find report
alt status='done'
queue_db -> api: report with download_url
api -> User: report ready
else status='in_progress'
api -> User: report in progress
else status='failed'
api -> User: report failed
end
@enduml