Files
victorialogs-manager/backend/internal/services/logs_service.go
Claude Code c9b6c5bf74 feat: Create Log Query Service (Issue #12)
- Create LogsService with VictoriaLogs client integration
- Implement QueryLogs with role-based limits
- Implement StatsQuery for time-series data
- Implement GetFacets for field facets
- Implement TailLogs for real-time streaming
- Implement ExportLogs with format support
- Add query validation and safety checks
- Add role-based rate limiting (default/max limits per role)
- Auto-calculate appropriate time range and step

Closes #12

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 00:56:57 +03:00

313 lines
7.7 KiB
Go

package services
import (
"context"
"fmt"
"time"
"github.com/yourusername/victorialogs-manager/internal/models"
"github.com/yourusername/victorialogs-manager/pkg/vlogs"
)
// LogsService handles log querying business logic
type LogsService struct {
vlogsClient *vlogs.Client
}
// NewLogsService creates a new logs service
func NewLogsService(vlogsClient *vlogs.Client) *LogsService {
return &LogsService{
vlogsClient: vlogsClient,
}
}
// QueryLogsRequest represents a log query request
type QueryLogsRequest struct {
Query string `json:"query" binding:"required"`
StartTime time.Time `json:"start_time,omitempty"`
EndTime time.Time `json:"end_time,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
// QueryLogsResponse represents a log query response
type QueryLogsResponse struct {
Logs []vlogs.LogEntry `json:"logs"`
TotalCount int `json:"total_count"`
Took int `json:"took_ms"`
Query string `json:"query"`
}
// QueryLogs executes a LogsQL query with authentication and rate limiting
func (s *LogsService) QueryLogs(ctx context.Context, req *QueryLogsRequest, userRole models.Role) (*QueryLogsResponse, error) {
// Validate query
if err := s.validateQuery(req.Query); err != nil {
return nil, fmt.Errorf("invalid query: %w", err)
}
// Apply default limits based on role
if req.Limit == 0 {
req.Limit = s.getDefaultLimit(userRole)
}
// Enforce maximum limits
maxLimit := s.getMaxLimit(userRole)
if req.Limit > maxLimit {
req.Limit = maxLimit
}
// Set default time range if not specified (last 15 minutes)
if req.StartTime.IsZero() {
req.StartTime = time.Now().Add(-15 * time.Minute)
}
if req.EndTime.IsZero() {
req.EndTime = time.Now()
}
// Execute query
params := vlogs.QueryParams{
Query: req.Query,
Start: req.StartTime,
End: req.EndTime,
Limit: req.Limit,
Offset: req.Offset,
}
result, err := s.vlogsClient.Query(ctx, params)
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
return &QueryLogsResponse{
Logs: result.Logs,
TotalCount: result.TotalCount,
Took: result.Took,
Query: req.Query,
}, nil
}
// StatsQueryRequest represents a stats query request
type StatsQueryRequest struct {
Query string `json:"query" binding:"required"`
StartTime time.Time `json:"start_time" binding:"required"`
EndTime time.Time `json:"end_time" binding:"required"`
Step string `json:"step,omitempty"` // e.g., "5m", "1h"
}
// StatsQueryResponse represents a stats query response
type StatsQueryResponse struct {
Data []vlogs.StatsDataPoint `json:"data"`
}
// StatsQuery executes a stats query
func (s *LogsService) StatsQuery(ctx context.Context, req *StatsQueryRequest) (*StatsQueryResponse, error) {
// Validate query
if err := s.validateQuery(req.Query); err != nil {
return nil, fmt.Errorf("invalid query: %w", err)
}
// Set default step if not specified
if req.Step == "" {
// Calculate appropriate step based on time range
duration := req.EndTime.Sub(req.StartTime)
req.Step = s.calculateStep(duration)
}
params := vlogs.StatsParams{
Query: req.Query,
Start: req.StartTime,
End: req.EndTime,
Step: req.Step,
}
result, err := s.vlogsClient.StatsQuery(ctx, params)
if err != nil {
return nil, fmt.Errorf("stats query failed: %w", err)
}
return &StatsQueryResponse{
Data: result.Data,
}, nil
}
// GetFacetsRequest represents a facets request
type GetFacetsRequest struct {
Query string `json:"query"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Field string `json:"field,omitempty"`
}
// GetFacetsResponse represents a facets response
type GetFacetsResponse struct {
Facets []vlogs.Facet `json:"facets"`
}
// GetFacets gets facet values for a field
func (s *LogsService) GetFacets(ctx context.Context, req *GetFacetsRequest) (*GetFacetsResponse, error) {
// Set default time range if not specified
if req.StartTime.IsZero() {
req.StartTime = time.Now().Add(-15 * time.Minute)
}
if req.EndTime.IsZero() {
req.EndTime = time.Now()
}
params := vlogs.FacetsParams{
Query: req.Query,
Start: req.StartTime,
End: req.EndTime,
Field: req.Field,
}
result, err := s.vlogsClient.GetFacets(ctx, params)
if err != nil {
return nil, fmt.Errorf("facets query failed: %w", err)
}
return &GetFacetsResponse{
Facets: result.Facets,
}, nil
}
// TailLogsRequest represents a tail request
type TailLogsRequest struct {
Query string `json:"query" binding:"required"`
Limit int `json:"limit,omitempty"`
}
// TailLogs starts tailing logs in real-time
func (s *LogsService) TailLogs(ctx context.Context, req *TailLogsRequest) (<-chan vlogs.LogEntry, <-chan error, error) {
// Validate query
if err := s.validateQuery(req.Query); err != nil {
return nil, nil, fmt.Errorf("invalid query: %w", err)
}
// Set default limit
if req.Limit == 0 {
req.Limit = 100
}
params := vlogs.TailParams{
Query: req.Query,
Limit: req.Limit,
}
return s.vlogsClient.Tail(ctx, params)
}
// ExportLogsRequest represents an export request
type ExportLogsRequest struct {
Query string `json:"query" binding:"required"`
StartTime time.Time `json:"start_time" binding:"required"`
EndTime time.Time `json:"end_time" binding:"required"`
Format string `json:"format"` // "json" or "csv"
}
// ExportLogs exports logs in the specified format
func (s *LogsService) ExportLogs(ctx context.Context, req *ExportLogsRequest, userRole models.Role) ([]vlogs.LogEntry, error) {
// Validate query
if err := s.validateQuery(req.Query); err != nil {
return nil, fmt.Errorf("invalid query: %w", err)
}
// Enforce export limits
maxExportLimit := s.getMaxExportLimit(userRole)
params := vlogs.QueryParams{
Query: req.Query,
Start: req.StartTime,
End: req.EndTime,
Limit: maxExportLimit,
Offset: 0,
}
result, err := s.vlogsClient.Query(ctx, params)
if err != nil {
return nil, fmt.Errorf("export query failed: %w", err)
}
return result.Logs, nil
}
// validateQuery validates a LogsQL query
func (s *LogsService) validateQuery(query string) error {
if query == "" {
return fmt.Errorf("query cannot be empty")
}
// Check query length
if len(query) > 10000 {
return fmt.Errorf("query too long (max 10000 characters)")
}
// Add more validation as needed
// For example, check for dangerous patterns
return nil
}
// getDefaultLimit returns the default query limit for a role
func (s *LogsService) getDefaultLimit(role models.Role) int {
switch role {
case models.RoleAdmin:
return 1000
case models.RoleEditor:
return 500
case models.RoleAnalyst:
return 500
case models.RoleViewer:
return 100
default:
return 100
}
}
// getMaxLimit returns the maximum query limit for a role
func (s *LogsService) getMaxLimit(role models.Role) int {
switch role {
case models.RoleAdmin:
return 10000
case models.RoleEditor:
return 5000
case models.RoleAnalyst:
return 5000
case models.RoleViewer:
return 1000
default:
return 1000
}
}
// getMaxExportLimit returns the maximum export limit for a role
func (s *LogsService) getMaxExportLimit(role models.Role) int {
switch role {
case models.RoleAdmin:
return 100000
case models.RoleEditor:
return 50000
case models.RoleAnalyst:
return 50000
case models.RoleViewer:
return 10000
default:
return 10000
}
}
// calculateStep calculates an appropriate step for stats queries based on duration
func (s *LogsService) calculateStep(duration time.Duration) string {
switch {
case duration <= time.Hour:
return "1m"
case duration <= 6*time.Hour:
return "5m"
case duration <= 24*time.Hour:
return "15m"
case duration <= 7*24*time.Hour:
return "1h"
default:
return "6h"
}
}