Files
Claude Code 158b9a1630 feat: Create VictoriaLogs HTTP Client (Issue #11)
- Create VictoriaLogs client package
- Implement Query method for LogsQL queries
- Implement StatsQuery for time-series stats
- Implement GetFacets for field facets
- Implement GetStreamIDs for stream identification
- Implement Tail for real-time log streaming
- Add Ingest method for testing
- Define all data models (LogEntry, QueryResult, etc.)
- Add comprehensive error handling

Closes #11

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 00:56:23 +03:00

280 lines
7.5 KiB
Go

package vlogs
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
)
// Client is a VictoriaLogs HTTP client
type Client struct {
baseURL string
httpClient *http.Client
timeout time.Duration
}
// NewClient creates a new VictoriaLogs client
func NewClient(baseURL string, timeout time.Duration) *Client {
return &Client{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: timeout,
},
timeout: timeout,
}
}
// Query executes a LogsQL query
func (c *Client) Query(ctx context.Context, params QueryParams) (*QueryResult, error) {
// Build query parameters
queryParams := url.Values{}
queryParams.Set("query", params.Query)
if !params.Start.IsZero() {
queryParams.Set("start", params.Start.Format(time.RFC3339Nano))
}
if !params.End.IsZero() {
queryParams.Set("end", params.End.Format(time.RFC3339Nano))
}
if params.Limit > 0 {
queryParams.Set("limit", fmt.Sprintf("%d", params.Limit))
}
if params.Offset > 0 {
queryParams.Set("offset", fmt.Sprintf("%d", params.Offset))
}
// Build URL
reqURL := fmt.Sprintf("%s/select/logsql/query?%s", c.baseURL, queryParams.Encode())
// Create request
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
// Execute request
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer resp.Body.Close()
// Check status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("query failed with status %d: %s", resp.StatusCode, string(body))
}
// Parse response
var result QueryResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &result, nil
}
// StatsQuery executes a stats query
func (c *Client) StatsQuery(ctx context.Context, params StatsParams) (*StatsResult, error) {
queryParams := url.Values{}
queryParams.Set("query", params.Query)
queryParams.Set("start", params.Start.Format(time.RFC3339Nano))
queryParams.Set("end", params.End.Format(time.RFC3339Nano))
if params.Step != "" {
queryParams.Set("step", params.Step)
}
reqURL := fmt.Sprintf("%s/select/logsql/stats_query_range?%s", c.baseURL, queryParams.Encode())
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to execute stats query: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("stats query failed with status %d: %s", resp.StatusCode, string(body))
}
var result StatsResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &result, nil
}
// GetFacets gets facet values for a field
func (c *Client) GetFacets(ctx context.Context, params FacetsParams) (*FacetsResult, error) {
queryParams := url.Values{}
queryParams.Set("query", params.Query)
queryParams.Set("start", params.Start.Format(time.RFC3339Nano))
queryParams.Set("end", params.End.Format(time.RFC3339Nano))
if params.Field != "" {
queryParams.Set("field", params.Field)
}
reqURL := fmt.Sprintf("%s/select/logsql/facets?%s", c.baseURL, queryParams.Encode())
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to get facets: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("facets query failed with status %d: %s", resp.StatusCode, string(body))
}
var result FacetsResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &result, nil
}
// GetStreamIDs gets stream IDs for a query
func (c *Client) GetStreamIDs(ctx context.Context, params StreamIDsParams) (*StreamIDsResult, error) {
queryParams := url.Values{}
queryParams.Set("query", params.Query)
queryParams.Set("start", params.Start.Format(time.RFC3339Nano))
queryParams.Set("end", params.End.Format(time.RFC3339Nano))
reqURL := fmt.Sprintf("%s/select/logsql/stream_ids?%s", c.baseURL, queryParams.Encode())
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to get stream IDs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("stream IDs query failed with status %d: %s", resp.StatusCode, string(body))
}
var result StreamIDsResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &result, nil
}
// Tail streams logs in real-time (returns a channel)
func (c *Client) Tail(ctx context.Context, params TailParams) (<-chan LogEntry, <-chan error, error) {
queryParams := url.Values{}
queryParams.Set("query", params.Query)
if params.Limit > 0 {
queryParams.Set("limit", fmt.Sprintf("%d", params.Limit))
}
reqURL := fmt.Sprintf("%s/select/logsql/tail?%s", c.baseURL, queryParams.Encode())
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, nil, fmt.Errorf("failed to start tail: %w", err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return nil, nil, fmt.Errorf("tail failed with status %d: %s", resp.StatusCode, string(body))
}
logChan := make(chan LogEntry, 100)
errChan := make(chan error, 1)
// Start goroutine to read streaming response
go func() {
defer resp.Body.Close()
defer close(logChan)
defer close(errChan)
decoder := json.NewDecoder(resp.Body)
for {
select {
case <-ctx.Done():
return
default:
var entry LogEntry
if err := decoder.Decode(&entry); err != nil {
if err != io.EOF {
errChan <- fmt.Errorf("failed to decode log entry: %w", err)
}
return
}
logChan <- entry
}
}
}()
return logChan, errChan, nil
}
// Ingest sends logs to VictoriaLogs (for testing purposes)
func (c *Client) Ingest(ctx context.Context, logs []LogEntry) error {
// Convert logs to NDJSON format
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
for _, log := range logs {
if err := encoder.Encode(log); err != nil {
return fmt.Errorf("failed to encode log: %w", err)
}
}
reqURL := fmt.Sprintf("%s/insert/jsonline", c.baseURL)
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/x-ndjson")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to ingest logs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("ingest failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}