package vlogs import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" "time" ) // Client is a VictoriaLogs HTTP client type Client struct { baseURL string httpClient *http.Client timeout time.Duration } // NewClient creates a new VictoriaLogs client func NewClient(baseURL string, timeout time.Duration) *Client { return &Client{ baseURL: baseURL, httpClient: &http.Client{ Timeout: timeout, }, timeout: timeout, } } // Query executes a LogsQL query func (c *Client) Query(ctx context.Context, params QueryParams) (*QueryResult, error) { // Build query parameters queryParams := url.Values{} queryParams.Set("query", params.Query) if !params.Start.IsZero() { queryParams.Set("start", params.Start.Format(time.RFC3339Nano)) } if !params.End.IsZero() { queryParams.Set("end", params.End.Format(time.RFC3339Nano)) } if params.Limit > 0 { queryParams.Set("limit", fmt.Sprintf("%d", params.Limit)) } if params.Offset > 0 { queryParams.Set("offset", fmt.Sprintf("%d", params.Offset)) } // Build URL reqURL := fmt.Sprintf("%s/select/logsql/query?%s", c.baseURL, queryParams.Encode()) // Create request req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } // Execute request resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } defer resp.Body.Close() // Check status code if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("query failed with status %d: %s", resp.StatusCode, string(body)) } // Parse response var result QueryResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } return &result, nil } // StatsQuery executes a stats query func (c *Client) StatsQuery(ctx context.Context, params StatsParams) (*StatsResult, error) { queryParams := url.Values{} queryParams.Set("query", params.Query) queryParams.Set("start", params.Start.Format(time.RFC3339Nano)) queryParams.Set("end", params.End.Format(time.RFC3339Nano)) if params.Step != "" { queryParams.Set("step", params.Step) } reqURL := fmt.Sprintf("%s/select/logsql/stats_query_range?%s", c.baseURL, queryParams.Encode()) req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to execute stats query: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("stats query failed with status %d: %s", resp.StatusCode, string(body)) } var result StatsResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } return &result, nil } // GetFacets gets facet values for a field func (c *Client) GetFacets(ctx context.Context, params FacetsParams) (*FacetsResult, error) { queryParams := url.Values{} queryParams.Set("query", params.Query) queryParams.Set("start", params.Start.Format(time.RFC3339Nano)) queryParams.Set("end", params.End.Format(time.RFC3339Nano)) if params.Field != "" { queryParams.Set("field", params.Field) } reqURL := fmt.Sprintf("%s/select/logsql/facets?%s", c.baseURL, queryParams.Encode()) req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to get facets: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("facets query failed with status %d: %s", resp.StatusCode, string(body)) } var result FacetsResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } return &result, nil } // GetStreamIDs gets stream IDs for a query func (c *Client) GetStreamIDs(ctx context.Context, params StreamIDsParams) (*StreamIDsResult, error) { queryParams := url.Values{} queryParams.Set("query", params.Query) queryParams.Set("start", params.Start.Format(time.RFC3339Nano)) queryParams.Set("end", params.End.Format(time.RFC3339Nano)) reqURL := fmt.Sprintf("%s/select/logsql/stream_ids?%s", c.baseURL, queryParams.Encode()) req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("failed to get stream IDs: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("stream IDs query failed with status %d: %s", resp.StatusCode, string(body)) } var result StreamIDsResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } return &result, nil } // Tail streams logs in real-time (returns a channel) func (c *Client) Tail(ctx context.Context, params TailParams) (<-chan LogEntry, <-chan error, error) { queryParams := url.Values{} queryParams.Set("query", params.Query) if params.Limit > 0 { queryParams.Set("limit", fmt.Sprintf("%d", params.Limit)) } reqURL := fmt.Sprintf("%s/select/logsql/tail?%s", c.baseURL, queryParams.Encode()) req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) if err != nil { return nil, nil, fmt.Errorf("failed to create request: %w", err) } resp, err := c.httpClient.Do(req) if err != nil { return nil, nil, fmt.Errorf("failed to start tail: %w", err) } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) resp.Body.Close() return nil, nil, fmt.Errorf("tail failed with status %d: %s", resp.StatusCode, string(body)) } logChan := make(chan LogEntry, 100) errChan := make(chan error, 1) // Start goroutine to read streaming response go func() { defer resp.Body.Close() defer close(logChan) defer close(errChan) decoder := json.NewDecoder(resp.Body) for { select { case <-ctx.Done(): return default: var entry LogEntry if err := decoder.Decode(&entry); err != nil { if err != io.EOF { errChan <- fmt.Errorf("failed to decode log entry: %w", err) } return } logChan <- entry } } }() return logChan, errChan, nil } // Ingest sends logs to VictoriaLogs (for testing purposes) func (c *Client) Ingest(ctx context.Context, logs []LogEntry) error { // Convert logs to NDJSON format var buf bytes.Buffer encoder := json.NewEncoder(&buf) for _, log := range logs { if err := encoder.Encode(log); err != nil { return fmt.Errorf("failed to encode log: %w", err) } } reqURL := fmt.Sprintf("%s/insert/jsonline", c.baseURL) req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf) if err != nil { return fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/x-ndjson") resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("failed to ingest logs: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("ingest failed with status %d: %s", resp.StatusCode, string(body)) } return nil }