- Create VictoriaLogs client package - Implement Query method for LogsQL queries - Implement StatsQuery for time-series stats - Implement GetFacets for field facets - Implement GetStreamIDs for stream identification - Implement Tail for real-time log streaming - Add Ingest method for testing - Define all data models (LogEntry, QueryResult, etc.) - Add comprehensive error handling Closes #11 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
280 lines
7.5 KiB
Go
280 lines
7.5 KiB
Go
package vlogs
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"time"
|
|
)
|
|
|
|
// Client is a VictoriaLogs HTTP client
|
|
type Client struct {
|
|
baseURL string
|
|
httpClient *http.Client
|
|
timeout time.Duration
|
|
}
|
|
|
|
// NewClient creates a new VictoriaLogs client
|
|
func NewClient(baseURL string, timeout time.Duration) *Client {
|
|
return &Client{
|
|
baseURL: baseURL,
|
|
httpClient: &http.Client{
|
|
Timeout: timeout,
|
|
},
|
|
timeout: timeout,
|
|
}
|
|
}
|
|
|
|
// Query executes a LogsQL query
|
|
func (c *Client) Query(ctx context.Context, params QueryParams) (*QueryResult, error) {
|
|
// Build query parameters
|
|
queryParams := url.Values{}
|
|
queryParams.Set("query", params.Query)
|
|
|
|
if !params.Start.IsZero() {
|
|
queryParams.Set("start", params.Start.Format(time.RFC3339Nano))
|
|
}
|
|
if !params.End.IsZero() {
|
|
queryParams.Set("end", params.End.Format(time.RFC3339Nano))
|
|
}
|
|
if params.Limit > 0 {
|
|
queryParams.Set("limit", fmt.Sprintf("%d", params.Limit))
|
|
}
|
|
if params.Offset > 0 {
|
|
queryParams.Set("offset", fmt.Sprintf("%d", params.Offset))
|
|
}
|
|
|
|
// Build URL
|
|
reqURL := fmt.Sprintf("%s/select/logsql/query?%s", c.baseURL, queryParams.Encode())
|
|
|
|
// Create request
|
|
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
// Execute request
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute query: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Check status code
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("query failed with status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
// Parse response
|
|
var result QueryResult
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
// StatsQuery executes a stats query
|
|
func (c *Client) StatsQuery(ctx context.Context, params StatsParams) (*StatsResult, error) {
|
|
queryParams := url.Values{}
|
|
queryParams.Set("query", params.Query)
|
|
queryParams.Set("start", params.Start.Format(time.RFC3339Nano))
|
|
queryParams.Set("end", params.End.Format(time.RFC3339Nano))
|
|
|
|
if params.Step != "" {
|
|
queryParams.Set("step", params.Step)
|
|
}
|
|
|
|
reqURL := fmt.Sprintf("%s/select/logsql/stats_query_range?%s", c.baseURL, queryParams.Encode())
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute stats query: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("stats query failed with status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var result StatsResult
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
// GetFacets gets facet values for a field
|
|
func (c *Client) GetFacets(ctx context.Context, params FacetsParams) (*FacetsResult, error) {
|
|
queryParams := url.Values{}
|
|
queryParams.Set("query", params.Query)
|
|
queryParams.Set("start", params.Start.Format(time.RFC3339Nano))
|
|
queryParams.Set("end", params.End.Format(time.RFC3339Nano))
|
|
|
|
if params.Field != "" {
|
|
queryParams.Set("field", params.Field)
|
|
}
|
|
|
|
reqURL := fmt.Sprintf("%s/select/logsql/facets?%s", c.baseURL, queryParams.Encode())
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get facets: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("facets query failed with status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var result FacetsResult
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
// GetStreamIDs gets stream IDs for a query
|
|
func (c *Client) GetStreamIDs(ctx context.Context, params StreamIDsParams) (*StreamIDsResult, error) {
|
|
queryParams := url.Values{}
|
|
queryParams.Set("query", params.Query)
|
|
queryParams.Set("start", params.Start.Format(time.RFC3339Nano))
|
|
queryParams.Set("end", params.End.Format(time.RFC3339Nano))
|
|
|
|
reqURL := fmt.Sprintf("%s/select/logsql/stream_ids?%s", c.baseURL, queryParams.Encode())
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get stream IDs: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return nil, fmt.Errorf("stream IDs query failed with status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
var result StreamIDsResult
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
// Tail streams logs in real-time (returns a channel)
|
|
func (c *Client) Tail(ctx context.Context, params TailParams) (<-chan LogEntry, <-chan error, error) {
|
|
queryParams := url.Values{}
|
|
queryParams.Set("query", params.Query)
|
|
|
|
if params.Limit > 0 {
|
|
queryParams.Set("limit", fmt.Sprintf("%d", params.Limit))
|
|
}
|
|
|
|
reqURL := fmt.Sprintf("%s/select/logsql/tail?%s", c.baseURL, queryParams.Encode())
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to start tail: %w", err)
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
return nil, nil, fmt.Errorf("tail failed with status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
logChan := make(chan LogEntry, 100)
|
|
errChan := make(chan error, 1)
|
|
|
|
// Start goroutine to read streaming response
|
|
go func() {
|
|
defer resp.Body.Close()
|
|
defer close(logChan)
|
|
defer close(errChan)
|
|
|
|
decoder := json.NewDecoder(resp.Body)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
var entry LogEntry
|
|
if err := decoder.Decode(&entry); err != nil {
|
|
if err != io.EOF {
|
|
errChan <- fmt.Errorf("failed to decode log entry: %w", err)
|
|
}
|
|
return
|
|
}
|
|
logChan <- entry
|
|
}
|
|
}
|
|
}()
|
|
|
|
return logChan, errChan, nil
|
|
}
|
|
|
|
// Ingest sends logs to VictoriaLogs (for testing purposes)
|
|
func (c *Client) Ingest(ctx context.Context, logs []LogEntry) error {
|
|
// Convert logs to NDJSON format
|
|
var buf bytes.Buffer
|
|
encoder := json.NewEncoder(&buf)
|
|
|
|
for _, log := range logs {
|
|
if err := encoder.Encode(log); err != nil {
|
|
return fmt.Errorf("failed to encode log: %w", err)
|
|
}
|
|
}
|
|
|
|
reqURL := fmt.Sprintf("%s/insert/jsonline", c.baseURL)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/x-ndjson")
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to ingest logs: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("ingest failed with status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
return nil
|
|
}
|