Merge pull request #7331 from dokku/k3s-one-off-fixes

Output logs for completed containers launched by the k3s scheduler
This commit is contained in:
Jose Diaz-Gonzalez
2024-11-04 14:42:20 -05:00
committed by GitHub
4 changed files with 211 additions and 106 deletions

View File

@@ -2,6 +2,7 @@ package common
import (
"fmt"
"io"
"os"
"strconv"
"strings"
@@ -46,6 +47,30 @@ func (w *writer) Write(bytes []byte) (int, error) {
return len(bytes), nil
}
// PrefixingWriter is a writer that prefixes all writes with a given prefix
type PrefixingWriter struct {
Prefix []byte
Writer io.Writer
}
// Write writes the given bytes to the writer with the prefix
func (pw *PrefixingWriter) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
// Perform an "atomic" write of a prefix and p to make sure that it doesn't interleave
// sub-line when used concurrently with io.PipeWrite.
n, err := pw.Writer.Write(append(pw.Prefix, p...))
if n > len(p) {
// To comply with the io.Writer interface requirements we must
// return a number of bytes written from p (0 <= n <= len(p)),
// so we are ignoring the length of the prefix here.
return len(p), err
}
return n, err
}
// LogFail is the failure log formatter
// prints text to stderr and exits with status 1
func LogFail(text string) {

View File

@@ -36,6 +36,9 @@ import (
// EnterPodInput contains all the information needed to enter a pod
type EnterPodInput struct {
// AllowCompletion is whether to allow the command to complete
AllowCompletion bool
// Clientset is the kubernetes clientset
Clientset KubernetesClient
@@ -106,12 +109,26 @@ type WaitForNodeToExistInput struct {
}
type WaitForPodBySelectorRunningInput struct {
Clientset KubernetesClient
Namespace string
// AllowCompletion is whether to allow the command to complete
AllowCompletion bool
// Clientset is the kubernetes clientset
Clientset KubernetesClient
// Namespace is the namespace to search in
Namespace string
// LabelSelector is the label selector to search for
LabelSelector string
PodName string
Timeout float64
Waiter func(ctx context.Context, clientset KubernetesClient, podName, namespace string) wait.ConditionWithContextFunc
// PodName is the pod name to search for
PodName string
// Timeout is the timeout in seconds to wait for the pod to be ready
Timeout float64
// Waiter is the waiter function
Waiter func(ctx context.Context, clientset KubernetesClient, podName, namespace string) wait.ConditionWithContextFunc
}
type WaitForPodToExistInput struct {
@@ -379,12 +396,13 @@ func enterPod(ctx context.Context, input EnterPodInput) error {
}
err := waitForPodBySelectorRunning(ctx, WaitForPodBySelectorRunningInput{
Clientset: input.Clientset,
Namespace: input.SelectedPod.Namespace,
LabelSelector: strings.Join(labelSelector, ","),
PodName: input.SelectedPod.Name,
Timeout: input.WaitTimeout,
Waiter: isPodReady,
AllowCompletion: input.AllowCompletion,
Clientset: input.Clientset,
Namespace: input.SelectedPod.Namespace,
LabelSelector: strings.Join(labelSelector, ","),
PodName: input.SelectedPod.Name,
Timeout: input.WaitTimeout,
Waiter: isPodReady,
})
if err != nil {
return fmt.Errorf("Error waiting for pod to be ready: %w", err)
@@ -1604,8 +1622,6 @@ func isK3sKubernetes() bool {
func isPodReady(ctx context.Context, clientset KubernetesClient, podName, namespace string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
fmt.Printf(".")
pod, err := clientset.GetPod(ctx, GetPodInput{
Name: podName,
Namespace: namespace,
@@ -1694,7 +1710,7 @@ func waitForPodBySelectorRunning(ctx context.Context, input WaitForPodBySelector
RetryCount: 3,
})
if err != nil {
return err
return fmt.Errorf("Error waiting for pod to exist: %w", err)
}
if len(pods) == 0 {
@@ -1709,8 +1725,12 @@ func waitForPodBySelectorRunning(ctx context.Context, input WaitForPodBySelector
if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, false, input.Waiter(ctx, input.Clientset, pod.Name, pod.Namespace)); err != nil {
print("\n")
return err
if input.AllowCompletion && errors.Is(err, conditions.ErrPodCompleted) {
return nil
}
return fmt.Errorf("Error waiting for pod %s to be running: %w", pod.Name, err)
}
fmt.Printf(".")
}
print("\n")
return nil
@@ -1759,6 +1779,11 @@ func waitForPodToExist(ctx context.Context, input WaitForPodToExistInput) ([]v1.
time.Sleep(1 * time.Second)
}
if len(pods) == 0 {
time.Sleep(1 * time.Second)
continue
}
if input.PodName == "" {
break
}

View File

@@ -10,7 +10,9 @@ import (
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/dokku/dokku/plugins/common"
"github.com/fatih/color"
@@ -19,7 +21,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -213,14 +215,14 @@ func (k KubernetesClient) CreateJob(ctx context.Context, input CreateJobInput) (
// CreateNamespaceInput contains all the information needed to create a Kubernetes namespace
type CreateNamespaceInput struct {
// Name is the name of the Kubernetes namespace
Name v1.Namespace
Name corev1.Namespace
}
// CreateNamespace creates a Kubernetes namespace
func (k KubernetesClient) CreateNamespace(ctx context.Context, input CreateNamespaceInput) (v1.Namespace, error) {
func (k KubernetesClient) CreateNamespace(ctx context.Context, input CreateNamespaceInput) (corev1.Namespace, error) {
namespaces, err := k.ListNamespaces(ctx)
if err != nil {
return v1.Namespace{}, err
return corev1.Namespace{}, err
}
for _, namespace := range namespaces {
@@ -231,11 +233,11 @@ func (k KubernetesClient) CreateNamespace(ctx context.Context, input CreateNames
namespace, err := k.Client.CoreV1().Namespaces().Create(ctx, &input.Name, metav1.CreateOptions{})
if err != nil {
return v1.Namespace{}, err
return corev1.Namespace{}, err
}
if namespace == nil {
return v1.Namespace{}, errors.New("namespace is nil")
return corev1.Namespace{}, errors.New("namespace is nil")
}
return *namespace, err
@@ -419,14 +421,14 @@ type GetPodInput struct {
}
// GetJob gets a Kubernetes job
func (k KubernetesClient) GetPod(ctx context.Context, input GetPodInput) (v1.Pod, error) {
func (k KubernetesClient) GetPod(ctx context.Context, input GetPodInput) (corev1.Pod, error) {
pod, err := k.Client.CoreV1().Pods(input.Namespace).Get(ctx, input.Name, metav1.GetOptions{})
if err != nil {
return v1.Pod{}, err
return corev1.Pod{}, err
}
if pod == nil {
return v1.Pod{}, errors.New("pod is nil")
return corev1.Pod{}, errors.New("pod is nil")
}
return *pod, err
@@ -578,13 +580,13 @@ func (k KubernetesClient) ListIngresses(ctx context.Context, input ListIngresses
}
// ListNamespaces lists Kubernetes namespaces
func (k KubernetesClient) ListNamespaces(ctx context.Context) ([]v1.Namespace, error) {
func (k KubernetesClient) ListNamespaces(ctx context.Context) ([]corev1.Namespace, error) {
namespaces, err := k.Client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return []v1.Namespace{}, err
return []corev1.Namespace{}, err
}
if namespaces == nil {
return []v1.Namespace{}, errors.New("namespaces is nil")
return []corev1.Namespace{}, errors.New("namespaces is nil")
}
return namespaces.Items, nil
@@ -597,7 +599,7 @@ type ListNodesInput struct {
}
// ListNodes lists Kubernetes nodes
func (k KubernetesClient) ListNodes(ctx context.Context, input ListNodesInput) ([]v1.Node, error) {
func (k KubernetesClient) ListNodes(ctx context.Context, input ListNodesInput) ([]corev1.Node, error) {
listOptions := metav1.ListOptions{}
if input.LabelSelector != "" {
common.LogDebug(fmt.Sprintf("Using label selector: %s", input.LabelSelector))
@@ -605,11 +607,11 @@ func (k KubernetesClient) ListNodes(ctx context.Context, input ListNodesInput) (
}
nodeList, err := k.Client.CoreV1().Nodes().List(ctx, listOptions)
if err != nil {
return []v1.Node{}, err
return []corev1.Node{}, err
}
if nodeList == nil {
return []v1.Node{}, errors.New("pod list is nil")
return []corev1.Node{}, errors.New("pod list is nil")
}
return nodeList.Items, err
@@ -625,15 +627,15 @@ type ListPodsInput struct {
}
// ListPods lists Kubernetes pods
func (k KubernetesClient) ListPods(ctx context.Context, input ListPodsInput) ([]v1.Pod, error) {
func (k KubernetesClient) ListPods(ctx context.Context, input ListPodsInput) ([]corev1.Pod, error) {
listOptions := metav1.ListOptions{LabelSelector: input.LabelSelector}
podList, err := k.Client.CoreV1().Pods(input.Namespace).List(ctx, listOptions)
if err != nil {
return []v1.Pod{}, err
return []corev1.Pod{}, err
}
if podList == nil {
return []v1.Pod{}, errors.New("pod list is nil")
return []corev1.Pod{}, errors.New("pod list is nil")
}
return podList.Items, err
@@ -708,23 +710,26 @@ func (k KubernetesClient) ScaleDeployment(ctx context.Context, input ScaleDeploy
}
type StreamLogsInput struct {
// Namespace is the Kubernetes namespace
Namespace string
// DeploymentName is the Kubernetes deployment name
DeploymentName string
// ContainerName is the Kubernetes container name
ContainerName string
// Follow is whether to follow the logs
Follow bool
// TailLines is the number of lines to tail
TailLines int64
// LabelSelector is the Kubernetes label selector
LabelSelector []string
// Namespace is the Kubernetes namespace
Namespace string
// Quiet is whether to suppress output
Quiet bool
// SinceSeconds is the number of seconds to go back
SinceSeconds int64
// TailLines is the number of lines to tail
TailLines int64
}
func (k KubernetesClient) StreamLogs(ctx context.Context, input StreamLogsInput) error {
@@ -743,7 +748,7 @@ func (k KubernetesClient) StreamLogs(ctx context.Context, input StreamLogsInput)
return fmt.Errorf("kubernetes api not available: %w", err)
}
labelSelector := []string{fmt.Sprintf("app.kubernetes.io/part-of=%s", input.DeploymentName)}
labelSelector := input.LabelSelector
processIndex := 0
if input.ContainerName != "" {
parts := strings.SplitN(input.ContainerName, ".", 2)
@@ -766,15 +771,39 @@ func (k KubernetesClient) StreamLogs(ctx context.Context, input StreamLogsInput)
return fmt.Errorf("Error listing pods: %w", err)
}
if len(pods) == 0 {
return fmt.Errorf("No pods found for app %s", input.DeploymentName)
return fmt.Errorf("No pods found matching specified labels")
}
ch := make(chan bool)
if os.Getenv("FORCE_TTY") == "1" {
color.NoColor = false
}
logOptions := corev1.PodLogOptions{
Follow: input.Follow,
}
if input.TailLines > 0 {
logOptions.TailLines = ptr.To(input.TailLines)
}
if input.SinceSeconds != 0 {
// round up to the nearest second
sec := int64(time.Duration(input.SinceSeconds * int64(time.Second)).Seconds())
logOptions.SinceSeconds = &sec
}
requests := make([]rest.ResponseWrapper, len(pods))
for i := 0; i < len(pods); i++ {
if processIndex > 0 && i != (processIndex-1) {
continue
}
podName := pods[i].Name
requests[i] = k.Client.CoreV1().Pods(input.Namespace).GetLogs(podName, &logOptions)
}
reader, writer := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))
colors := []color.Attribute{
color.FgRed,
color.FgYellow,
@@ -783,59 +812,71 @@ func (k KubernetesClient) StreamLogs(ctx context.Context, input StreamLogsInput)
color.FgBlue,
color.FgMagenta,
}
// colorIndex := 0
for i := 0; i < len(pods); i++ {
if processIndex > 0 && i != (processIndex-1) {
continue
}
logOptions := v1.PodLogOptions{
Follow: input.Follow,
}
if input.TailLines > 0 {
logOptions.TailLines = ptr.To(input.TailLines)
}
for i := 0; i < len(requests); i++ {
request := requests[i]
podName := pods[i].Name
podColor := colors[i%len(colors)]
dynoText := color.New(podColor).SprintFunc()
podName := pods[i].Name
podLogs, err := k.Client.CoreV1().Pods(input.Namespace).GetLogs(podName, &logOptions).Stream(ctx)
if err != nil {
prefix := dynoText(fmt.Sprintf("app[%s]: ", podName))
go func(ctx context.Context, request rest.ResponseWrapper, prefix string) {
defer wg.Done()
out := k.addPrefixingWriter(writer, prefix, input.Quiet)
if err := streamLogsFromRequest(ctx, request, out); err != nil {
// check if error is context canceled
if errors.Is(err, context.Canceled) {
writer.Close()
return
}
writer.CloseWithError(err)
return
}
}(ctx, request, prefix)
}
go func() {
wg.Wait()
writer.Close()
}()
_, err = io.Copy(os.Stdout, reader)
return err
}
func (k KubernetesClient) addPrefixingWriter(writer *io.PipeWriter, prefix string, quiet bool) io.Writer {
if quiet {
return writer
}
return &common.PrefixingWriter{
Prefix: []byte(prefix),
Writer: writer,
}
}
func streamLogsFromRequest(ctx context.Context, request rest.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream(ctx)
if err != nil {
return err
}
defer readCloser.Close()
r := bufio.NewReader(readCloser)
for {
bytes, err := r.ReadBytes('\n')
if _, err := out.Write(bytes); err != nil {
return err
}
buffer := bufio.NewReader(podLogs)
go func(ctx context.Context, buffer *bufio.Reader, prettyText func(a ...interface{}) string, ch chan bool) {
defer func() {
ch <- true
}()
for {
select {
case <-ctx.Done(): // if cancel() execute
ch <- true
return
default:
str, readErr := buffer.ReadString('\n')
if readErr == io.EOF {
break
}
if str == "" {
continue
}
if !input.Quiet {
str = fmt.Sprintf("%s %s", dynoText(fmt.Sprintf("app[%s]:", podName)), str)
}
_, err := fmt.Print(str)
if err != nil {
return
}
}
if err != nil {
if err != io.EOF {
return err
}
}(ctx, buffer, dynoText, ch)
return nil
}
}
<-ch
return nil
}

View File

@@ -817,12 +817,12 @@ func TriggerSchedulerLogs(scheduler string, appName string, processType string,
}
return clientset.StreamLogs(context.Background(), StreamLogsInput{
DeploymentName: appName,
Namespace: getComputedNamespace(appName),
ContainerName: processType,
TailLines: numLines,
Follow: tail,
Quiet: quiet,
Namespace: getComputedNamespace(appName),
ContainerName: processType,
LabelSelector: []string{fmt.Sprintf("app.kubernetes.io/part-of=%s", appName)},
TailLines: numLines,
Follow: tail,
Quiet: quiet,
})
}
@@ -1168,7 +1168,7 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
}
batchJobSelector := fmt.Sprintf("batch.kubernetes.io/job-name=%s", createdJob.Name)
pods, err := waitForPodToExist(ctx, WaitForPodToExistInput{
_, err = waitForPodToExist(ctx, WaitForPodToExistInput{
Clientset: clientset,
Namespace: namespace,
RetryCount: 3,
@@ -1203,6 +1203,18 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
return fmt.Errorf("Error completed pod: %w", err)
}
selectedPod := pods[0]
err := clientset.StreamLogs(ctx, StreamLogsInput{
ContainerName: processType,
Follow: false,
LabelSelector: []string{batchJobSelector},
Namespace: namespace,
Quiet: true,
SinceSeconds: 10,
})
if err != nil {
return fmt.Errorf("Error streaming logs: %w", err)
}
if selectedPod.Status.Phase == v1.PodFailed {
for _, status := range selectedPod.Status.ContainerStatuses {
if status.Name != fmt.Sprintf("%s-%s", appName, processType) {
@@ -1216,13 +1228,14 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
return fmt.Errorf("Unable to attach as the pod has already exited with a failed exit code")
} else if selectedPod.Status.Phase == v1.PodSucceeded {
return errors.New("Unable to attach as the pod has already exited with a successful exit code")
return nil
}
}
return fmt.Errorf("Error waiting for pod to be running: %w", err)
}
pods, err = clientset.ListPods(ctx, ListPodsInput{
pods, err := clientset.ListPods(ctx, ListPodsInput{
Namespace: namespace,
LabelSelector: batchJobSelector,
})
@@ -1250,11 +1263,12 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
}
case v1.PodRunning:
return enterPod(ctx, EnterPodInput{
Clientset: clientset,
Command: command,
Entrypoint: entrypoint,
SelectedPod: selectedPod,
WaitTimeout: 10,
AllowCompletion: true,
Clientset: clientset,
Command: command,
Entrypoint: entrypoint,
SelectedPod: selectedPod,
WaitTimeout: 10,
})
default:
return fmt.Errorf("Unable to attach as the pod is in an unknown state: %s", selectedPod.Status.Phase)