diff --git a/backend/pkg/vlogs/client.go b/backend/pkg/vlogs/client.go new file mode 100644 index 0000000..1968a95 --- /dev/null +++ b/backend/pkg/vlogs/client.go @@ -0,0 +1,279 @@ +package vlogs + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" +) + +// Client is a VictoriaLogs HTTP client +type Client struct { + baseURL string + httpClient *http.Client + timeout time.Duration +} + +// NewClient creates a new VictoriaLogs client +func NewClient(baseURL string, timeout time.Duration) *Client { + return &Client{ + baseURL: baseURL, + httpClient: &http.Client{ + Timeout: timeout, + }, + timeout: timeout, + } +} + +// Query executes a LogsQL query +func (c *Client) Query(ctx context.Context, params QueryParams) (*QueryResult, error) { + // Build query parameters + queryParams := url.Values{} + queryParams.Set("query", params.Query) + + if !params.Start.IsZero() { + queryParams.Set("start", params.Start.Format(time.RFC3339Nano)) + } + if !params.End.IsZero() { + queryParams.Set("end", params.End.Format(time.RFC3339Nano)) + } + if params.Limit > 0 { + queryParams.Set("limit", fmt.Sprintf("%d", params.Limit)) + } + if params.Offset > 0 { + queryParams.Set("offset", fmt.Sprintf("%d", params.Offset)) + } + + // Build URL + reqURL := fmt.Sprintf("%s/select/logsql/query?%s", c.baseURL, queryParams.Encode()) + + // Create request + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Execute request + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer resp.Body.Close() + + // Check status code + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("query failed with status %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var result QueryResult + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return &result, nil +} + +// StatsQuery executes a stats query +func (c *Client) StatsQuery(ctx context.Context, params StatsParams) (*StatsResult, error) { + queryParams := url.Values{} + queryParams.Set("query", params.Query) + queryParams.Set("start", params.Start.Format(time.RFC3339Nano)) + queryParams.Set("end", params.End.Format(time.RFC3339Nano)) + + if params.Step != "" { + queryParams.Set("step", params.Step) + } + + reqURL := fmt.Sprintf("%s/select/logsql/stats_query_range?%s", c.baseURL, queryParams.Encode()) + + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute stats query: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("stats query failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result StatsResult + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return &result, nil +} + +// GetFacets gets facet values for a field +func (c *Client) GetFacets(ctx context.Context, params FacetsParams) (*FacetsResult, error) { + queryParams := url.Values{} + queryParams.Set("query", params.Query) + queryParams.Set("start", params.Start.Format(time.RFC3339Nano)) + queryParams.Set("end", params.End.Format(time.RFC3339Nano)) + + if params.Field != "" { + queryParams.Set("field", params.Field) + } + + reqURL := fmt.Sprintf("%s/select/logsql/facets?%s", c.baseURL, queryParams.Encode()) + + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to get facets: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("facets query failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result FacetsResult + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return &result, nil +} + +// GetStreamIDs gets stream IDs for a query +func (c *Client) GetStreamIDs(ctx context.Context, params StreamIDsParams) (*StreamIDsResult, error) { + queryParams := url.Values{} + queryParams.Set("query", params.Query) + queryParams.Set("start", params.Start.Format(time.RFC3339Nano)) + queryParams.Set("end", params.End.Format(time.RFC3339Nano)) + + reqURL := fmt.Sprintf("%s/select/logsql/stream_ids?%s", c.baseURL, queryParams.Encode()) + + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to get stream IDs: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("stream IDs query failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result StreamIDsResult + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return &result, nil +} + +// Tail streams logs in real-time (returns a channel) +func (c *Client) Tail(ctx context.Context, params TailParams) (<-chan LogEntry, <-chan error, error) { + queryParams := url.Values{} + queryParams.Set("query", params.Query) + + if params.Limit > 0 { + queryParams.Set("limit", fmt.Sprintf("%d", params.Limit)) + } + + reqURL := fmt.Sprintf("%s/select/logsql/tail?%s", c.baseURL, queryParams.Encode()) + + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, nil, fmt.Errorf("failed to create request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, nil, fmt.Errorf("failed to start tail: %w", err) + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return nil, nil, fmt.Errorf("tail failed with status %d: %s", resp.StatusCode, string(body)) + } + + logChan := make(chan LogEntry, 100) + errChan := make(chan error, 1) + + // Start goroutine to read streaming response + go func() { + defer resp.Body.Close() + defer close(logChan) + defer close(errChan) + + decoder := json.NewDecoder(resp.Body) + for { + select { + case <-ctx.Done(): + return + default: + var entry LogEntry + if err := decoder.Decode(&entry); err != nil { + if err != io.EOF { + errChan <- fmt.Errorf("failed to decode log entry: %w", err) + } + return + } + logChan <- entry + } + } + }() + + return logChan, errChan, nil +} + +// Ingest sends logs to VictoriaLogs (for testing purposes) +func (c *Client) Ingest(ctx context.Context, logs []LogEntry) error { + // Convert logs to NDJSON format + var buf bytes.Buffer + encoder := json.NewEncoder(&buf) + + for _, log := range logs { + if err := encoder.Encode(log); err != nil { + return fmt.Errorf("failed to encode log: %w", err) + } + } + + reqURL := fmt.Sprintf("%s/insert/jsonline", c.baseURL) + + req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/x-ndjson") + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to ingest logs: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("ingest failed with status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} diff --git a/backend/pkg/vlogs/models.go b/backend/pkg/vlogs/models.go new file mode 100644 index 0000000..4902ead --- /dev/null +++ b/backend/pkg/vlogs/models.go @@ -0,0 +1,83 @@ +package vlogs + +import "time" + +// QueryParams represents parameters for a LogsQL query +type QueryParams struct { + Query string `json:"query"` + Start time.Time `json:"start,omitempty"` + End time.Time `json:"end,omitempty"` + Limit int `json:"limit,omitempty"` + Offset int `json:"offset,omitempty"` +} + +// LogEntry represents a single log entry +type LogEntry struct { + Timestamp time.Time `json:"_time"` + StreamID string `json:"_stream_id,omitempty"` + Message string `json:"_msg"` + Fields map[string]interface{} `json:"fields,omitempty"` +} + +// QueryResult represents the result of a query +type QueryResult struct { + Logs []LogEntry `json:"logs"` + TotalCount int `json:"total_count"` + Took int `json:"took_ms"` +} + +// StatsParams represents parameters for a stats query +type StatsParams struct { + Query string `json:"query"` + Start time.Time `json:"start"` + End time.Time `json:"end"` + Step string `json:"step,omitempty"` // e.g., "5m", "1h" +} + +// StatsResult represents the result of a stats query +type StatsResult struct { + Data []StatsDataPoint `json:"data"` +} + +// StatsDataPoint represents a single data point in stats +type StatsDataPoint struct { + Timestamp time.Time `json:"timestamp"` + Values map[string]interface{} `json:"values"` +} + +// FacetsParams represents parameters for getting facets +type FacetsParams struct { + Query string `json:"query"` + Start time.Time `json:"start"` + End time.Time `json:"end"` + Field string `json:"field,omitempty"` +} + +// FacetsResult represents the result of a facets query +type FacetsResult struct { + Facets []Facet `json:"facets"` +} + +// Facet represents a facet value with count +type Facet struct { + Value string `json:"value"` + Count int `json:"count"` +} + +// StreamIDsParams represents parameters for getting stream IDs +type StreamIDsParams struct { + Query string `json:"query"` + Start time.Time `json:"start"` + End time.Time `json:"end"` +} + +// StreamIDsResult represents the result of stream IDs query +type StreamIDsResult struct { + StreamIDs []string `json:"stream_ids"` +} + +// TailParams represents parameters for tailing logs +type TailParams struct { + Query string `json:"query"` + Limit int `json:"limit,omitempty"` +}