refactor: output logs for completed containers

Previously, a container that exited immediately would just show an error message, which is probably not what we want.

If the pod has exited, we will display the logs on stdout for the user and then exit appropriately.

This change also refactors how log streaming works so that we respect exiting when the stream has no content (such as when the container has exited). Previously, the container would just hang around forever due to not being properly canceled. Due to this refactor, the prefix is now split out into a separate writer.
This commit is contained in:
Jose Diaz-Gonzalez
2024-11-04 02:49:20 -05:00
parent 2368161132
commit ec41dd2e70
4 changed files with 211 additions and 106 deletions

View File

@@ -2,6 +2,7 @@ package common
import (
"fmt"
"io"
"os"
"strconv"
"strings"
@@ -46,6 +47,30 @@ func (w *writer) Write(bytes []byte) (int, error) {
return len(bytes), nil
}
// PrefixingWriter is a writer that prefixes all writes with a given prefix
type PrefixingWriter struct {
Prefix []byte
Writer io.Writer
}
// Write writes the given bytes to the writer with the prefix
func (pw *PrefixingWriter) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
// Perform an "atomic" write of a prefix and p to make sure that it doesn't interleave
// sub-line when used concurrently with io.PipeWrite.
n, err := pw.Writer.Write(append(pw.Prefix, p...))
if n > len(p) {
// To comply with the io.Writer interface requirements we must
// return a number of bytes written from p (0 <= n <= len(p)),
// so we are ignoring the length of the prefix here.
return len(p), err
}
return n, err
}
// LogFail is the failure log formatter
// prints text to stderr and exits with status 1
func LogFail(text string) {

View File

@@ -36,6 +36,9 @@ import (
// EnterPodInput contains all the information needed to enter a pod
type EnterPodInput struct {
// AllowCompletion is whether to allow the command to complete
AllowCompletion bool
// Clientset is the kubernetes clientset
Clientset KubernetesClient
@@ -106,12 +109,26 @@ type WaitForNodeToExistInput struct {
}
type WaitForPodBySelectorRunningInput struct {
Clientset KubernetesClient
Namespace string
// AllowCompletion is whether to allow the command to complete
AllowCompletion bool
// Clientset is the kubernetes clientset
Clientset KubernetesClient
// Namespace is the namespace to search in
Namespace string
// LabelSelector is the label selector to search for
LabelSelector string
PodName string
Timeout float64
Waiter func(ctx context.Context, clientset KubernetesClient, podName, namespace string) wait.ConditionWithContextFunc
// PodName is the pod name to search for
PodName string
// Timeout is the timeout in seconds to wait for the pod to be ready
Timeout float64
// Waiter is the waiter function
Waiter func(ctx context.Context, clientset KubernetesClient, podName, namespace string) wait.ConditionWithContextFunc
}
type WaitForPodToExistInput struct {
@@ -379,12 +396,13 @@ func enterPod(ctx context.Context, input EnterPodInput) error {
}
err := waitForPodBySelectorRunning(ctx, WaitForPodBySelectorRunningInput{
Clientset: input.Clientset,
Namespace: input.SelectedPod.Namespace,
LabelSelector: strings.Join(labelSelector, ","),
PodName: input.SelectedPod.Name,
Timeout: input.WaitTimeout,
Waiter: isPodReady,
AllowCompletion: input.AllowCompletion,
Clientset: input.Clientset,
Namespace: input.SelectedPod.Namespace,
LabelSelector: strings.Join(labelSelector, ","),
PodName: input.SelectedPod.Name,
Timeout: input.WaitTimeout,
Waiter: isPodReady,
})
if err != nil {
return fmt.Errorf("Error waiting for pod to be ready: %w", err)
@@ -1604,8 +1622,6 @@ func isK3sKubernetes() bool {
func isPodReady(ctx context.Context, clientset KubernetesClient, podName, namespace string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
fmt.Printf(".")
pod, err := clientset.GetPod(ctx, GetPodInput{
Name: podName,
Namespace: namespace,
@@ -1694,7 +1710,7 @@ func waitForPodBySelectorRunning(ctx context.Context, input WaitForPodBySelector
RetryCount: 3,
})
if err != nil {
return err
return fmt.Errorf("Error waiting for pod to exist: %w", err)
}
if len(pods) == 0 {
@@ -1709,8 +1725,12 @@ func waitForPodBySelectorRunning(ctx context.Context, input WaitForPodBySelector
if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, false, input.Waiter(ctx, input.Clientset, pod.Name, pod.Namespace)); err != nil {
print("\n")
return err
if input.AllowCompletion && errors.Is(err, conditions.ErrPodCompleted) {
return nil
}
return fmt.Errorf("Error waiting for pod %s to be running: %w", pod.Name, err)
}
fmt.Printf(".")
}
print("\n")
return nil
@@ -1759,6 +1779,11 @@ func waitForPodToExist(ctx context.Context, input WaitForPodToExistInput) ([]v1.
time.Sleep(1 * time.Second)
}
if len(pods) == 0 {
time.Sleep(1 * time.Second)
continue
}
if input.PodName == "" {
break
}

View File

@@ -10,7 +10,9 @@ import (
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/dokku/dokku/plugins/common"
"github.com/fatih/color"
@@ -19,7 +21,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -213,14 +215,14 @@ func (k KubernetesClient) CreateJob(ctx context.Context, input CreateJobInput) (
// CreateNamespaceInput contains all the information needed to create a Kubernetes namespace
type CreateNamespaceInput struct {
// Name is the name of the Kubernetes namespace
Name v1.Namespace
Name corev1.Namespace
}
// CreateNamespace creates a Kubernetes namespace
func (k KubernetesClient) CreateNamespace(ctx context.Context, input CreateNamespaceInput) (v1.Namespace, error) {
func (k KubernetesClient) CreateNamespace(ctx context.Context, input CreateNamespaceInput) (corev1.Namespace, error) {
namespaces, err := k.ListNamespaces(ctx)
if err != nil {
return v1.Namespace{}, err
return corev1.Namespace{}, err
}
for _, namespace := range namespaces {
@@ -231,11 +233,11 @@ func (k KubernetesClient) CreateNamespace(ctx context.Context, input CreateNames
namespace, err := k.Client.CoreV1().Namespaces().Create(ctx, &input.Name, metav1.CreateOptions{})
if err != nil {
return v1.Namespace{}, err
return corev1.Namespace{}, err
}
if namespace == nil {
return v1.Namespace{}, errors.New("namespace is nil")
return corev1.Namespace{}, errors.New("namespace is nil")
}
return *namespace, err
@@ -419,14 +421,14 @@ type GetPodInput struct {
}
// GetJob gets a Kubernetes job
func (k KubernetesClient) GetPod(ctx context.Context, input GetPodInput) (v1.Pod, error) {
func (k KubernetesClient) GetPod(ctx context.Context, input GetPodInput) (corev1.Pod, error) {
pod, err := k.Client.CoreV1().Pods(input.Namespace).Get(ctx, input.Name, metav1.GetOptions{})
if err != nil {
return v1.Pod{}, err
return corev1.Pod{}, err
}
if pod == nil {
return v1.Pod{}, errors.New("pod is nil")
return corev1.Pod{}, errors.New("pod is nil")
}
return *pod, err
@@ -578,13 +580,13 @@ func (k KubernetesClient) ListIngresses(ctx context.Context, input ListIngresses
}
// ListNamespaces lists Kubernetes namespaces
func (k KubernetesClient) ListNamespaces(ctx context.Context) ([]v1.Namespace, error) {
func (k KubernetesClient) ListNamespaces(ctx context.Context) ([]corev1.Namespace, error) {
namespaces, err := k.Client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return []v1.Namespace{}, err
return []corev1.Namespace{}, err
}
if namespaces == nil {
return []v1.Namespace{}, errors.New("namespaces is nil")
return []corev1.Namespace{}, errors.New("namespaces is nil")
}
return namespaces.Items, nil
@@ -597,7 +599,7 @@ type ListNodesInput struct {
}
// ListNodes lists Kubernetes nodes
func (k KubernetesClient) ListNodes(ctx context.Context, input ListNodesInput) ([]v1.Node, error) {
func (k KubernetesClient) ListNodes(ctx context.Context, input ListNodesInput) ([]corev1.Node, error) {
listOptions := metav1.ListOptions{}
if input.LabelSelector != "" {
common.LogDebug(fmt.Sprintf("Using label selector: %s", input.LabelSelector))
@@ -605,11 +607,11 @@ func (k KubernetesClient) ListNodes(ctx context.Context, input ListNodesInput) (
}
nodeList, err := k.Client.CoreV1().Nodes().List(ctx, listOptions)
if err != nil {
return []v1.Node{}, err
return []corev1.Node{}, err
}
if nodeList == nil {
return []v1.Node{}, errors.New("pod list is nil")
return []corev1.Node{}, errors.New("pod list is nil")
}
return nodeList.Items, err
@@ -625,15 +627,15 @@ type ListPodsInput struct {
}
// ListPods lists Kubernetes pods
func (k KubernetesClient) ListPods(ctx context.Context, input ListPodsInput) ([]v1.Pod, error) {
func (k KubernetesClient) ListPods(ctx context.Context, input ListPodsInput) ([]corev1.Pod, error) {
listOptions := metav1.ListOptions{LabelSelector: input.LabelSelector}
podList, err := k.Client.CoreV1().Pods(input.Namespace).List(ctx, listOptions)
if err != nil {
return []v1.Pod{}, err
return []corev1.Pod{}, err
}
if podList == nil {
return []v1.Pod{}, errors.New("pod list is nil")
return []corev1.Pod{}, errors.New("pod list is nil")
}
return podList.Items, err
@@ -708,23 +710,26 @@ func (k KubernetesClient) ScaleDeployment(ctx context.Context, input ScaleDeploy
}
type StreamLogsInput struct {
// Namespace is the Kubernetes namespace
Namespace string
// DeploymentName is the Kubernetes deployment name
DeploymentName string
// ContainerName is the Kubernetes container name
ContainerName string
// Follow is whether to follow the logs
Follow bool
// TailLines is the number of lines to tail
TailLines int64
// LabelSelector is the Kubernetes label selector
LabelSelector []string
// Namespace is the Kubernetes namespace
Namespace string
// Quiet is whether to suppress output
Quiet bool
// SinceSeconds is the number of seconds to go back
SinceSeconds int64
// TailLines is the number of lines to tail
TailLines int64
}
func (k KubernetesClient) StreamLogs(ctx context.Context, input StreamLogsInput) error {
@@ -743,7 +748,7 @@ func (k KubernetesClient) StreamLogs(ctx context.Context, input StreamLogsInput)
return fmt.Errorf("kubernetes api not available: %w", err)
}
labelSelector := []string{fmt.Sprintf("app.kubernetes.io/part-of=%s", input.DeploymentName)}
labelSelector := input.LabelSelector
processIndex := 0
if input.ContainerName != "" {
parts := strings.SplitN(input.ContainerName, ".", 2)
@@ -766,15 +771,39 @@ func (k KubernetesClient) StreamLogs(ctx context.Context, input StreamLogsInput)
return fmt.Errorf("Error listing pods: %w", err)
}
if len(pods) == 0 {
return fmt.Errorf("No pods found for app %s", input.DeploymentName)
return fmt.Errorf("No pods found matching specified labels")
}
ch := make(chan bool)
if os.Getenv("FORCE_TTY") == "1" {
color.NoColor = false
}
logOptions := corev1.PodLogOptions{
Follow: input.Follow,
}
if input.TailLines > 0 {
logOptions.TailLines = ptr.To(input.TailLines)
}
if input.SinceSeconds != 0 {
// round up to the nearest second
sec := int64(time.Duration(input.SinceSeconds * int64(time.Second)).Seconds())
logOptions.SinceSeconds = &sec
}
requests := make([]rest.ResponseWrapper, len(pods))
for i := 0; i < len(pods); i++ {
if processIndex > 0 && i != (processIndex-1) {
continue
}
podName := pods[i].Name
requests[i] = k.Client.CoreV1().Pods(input.Namespace).GetLogs(podName, &logOptions)
}
reader, writer := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))
colors := []color.Attribute{
color.FgRed,
color.FgYellow,
@@ -783,59 +812,71 @@ func (k KubernetesClient) StreamLogs(ctx context.Context, input StreamLogsInput)
color.FgBlue,
color.FgMagenta,
}
// colorIndex := 0
for i := 0; i < len(pods); i++ {
if processIndex > 0 && i != (processIndex-1) {
continue
}
logOptions := v1.PodLogOptions{
Follow: input.Follow,
}
if input.TailLines > 0 {
logOptions.TailLines = ptr.To(input.TailLines)
}
for i := 0; i < len(requests); i++ {
request := requests[i]
podName := pods[i].Name
podColor := colors[i%len(colors)]
dynoText := color.New(podColor).SprintFunc()
podName := pods[i].Name
podLogs, err := k.Client.CoreV1().Pods(input.Namespace).GetLogs(podName, &logOptions).Stream(ctx)
if err != nil {
prefix := dynoText(fmt.Sprintf("app[%s]: ", podName))
go func(ctx context.Context, request rest.ResponseWrapper, prefix string) {
defer wg.Done()
out := k.addPrefixingWriter(writer, prefix, input.Quiet)
if err := streamLogsFromRequest(ctx, request, out); err != nil {
// check if error is context canceled
if errors.Is(err, context.Canceled) {
writer.Close()
return
}
writer.CloseWithError(err)
return
}
}(ctx, request, prefix)
}
go func() {
wg.Wait()
writer.Close()
}()
_, err = io.Copy(os.Stdout, reader)
return err
}
func (k KubernetesClient) addPrefixingWriter(writer *io.PipeWriter, prefix string, quiet bool) io.Writer {
if quiet {
return writer
}
return &common.PrefixingWriter{
Prefix: []byte(prefix),
Writer: writer,
}
}
func streamLogsFromRequest(ctx context.Context, request rest.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream(ctx)
if err != nil {
return err
}
defer readCloser.Close()
r := bufio.NewReader(readCloser)
for {
bytes, err := r.ReadBytes('\n')
if _, err := out.Write(bytes); err != nil {
return err
}
buffer := bufio.NewReader(podLogs)
go func(ctx context.Context, buffer *bufio.Reader, prettyText func(a ...interface{}) string, ch chan bool) {
defer func() {
ch <- true
}()
for {
select {
case <-ctx.Done(): // if cancel() execute
ch <- true
return
default:
str, readErr := buffer.ReadString('\n')
if readErr == io.EOF {
break
}
if str == "" {
continue
}
if !input.Quiet {
str = fmt.Sprintf("%s %s", dynoText(fmt.Sprintf("app[%s]:", podName)), str)
}
_, err := fmt.Print(str)
if err != nil {
return
}
}
if err != nil {
if err != io.EOF {
return err
}
}(ctx, buffer, dynoText, ch)
return nil
}
}
<-ch
return nil
}

View File

@@ -817,12 +817,12 @@ func TriggerSchedulerLogs(scheduler string, appName string, processType string,
}
return clientset.StreamLogs(context.Background(), StreamLogsInput{
DeploymentName: appName,
Namespace: getComputedNamespace(appName),
ContainerName: processType,
TailLines: numLines,
Follow: tail,
Quiet: quiet,
Namespace: getComputedNamespace(appName),
ContainerName: processType,
LabelSelector: []string{fmt.Sprintf("app.kubernetes.io/part-of=%s", appName)},
TailLines: numLines,
Follow: tail,
Quiet: quiet,
})
}
@@ -1168,7 +1168,7 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
}
batchJobSelector := fmt.Sprintf("batch.kubernetes.io/job-name=%s", createdJob.Name)
pods, err := waitForPodToExist(ctx, WaitForPodToExistInput{
_, err = waitForPodToExist(ctx, WaitForPodToExistInput{
Clientset: clientset,
Namespace: namespace,
RetryCount: 3,
@@ -1203,6 +1203,18 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
return fmt.Errorf("Error completed pod: %w", err)
}
selectedPod := pods[0]
err := clientset.StreamLogs(ctx, StreamLogsInput{
ContainerName: processType,
Follow: false,
LabelSelector: []string{batchJobSelector},
Namespace: namespace,
Quiet: true,
SinceSeconds: 10,
})
if err != nil {
return fmt.Errorf("Error streaming logs: %w", err)
}
if selectedPod.Status.Phase == v1.PodFailed {
for _, status := range selectedPod.Status.ContainerStatuses {
if status.Name != fmt.Sprintf("%s-%s", appName, processType) {
@@ -1216,13 +1228,14 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
return fmt.Errorf("Unable to attach as the pod has already exited with a failed exit code")
} else if selectedPod.Status.Phase == v1.PodSucceeded {
return errors.New("Unable to attach as the pod has already exited with a successful exit code")
return nil
}
}
return fmt.Errorf("Error waiting for pod to be running: %w", err)
}
pods, err = clientset.ListPods(ctx, ListPodsInput{
pods, err := clientset.ListPods(ctx, ListPodsInput{
Namespace: namespace,
LabelSelector: batchJobSelector,
})
@@ -1250,11 +1263,12 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
}
case v1.PodRunning:
return enterPod(ctx, EnterPodInput{
Clientset: clientset,
Command: command,
Entrypoint: entrypoint,
SelectedPod: selectedPod,
WaitTimeout: 10,
AllowCompletion: true,
Clientset: clientset,
Command: command,
Entrypoint: entrypoint,
SelectedPod: selectedPod,
WaitTimeout: 10,
})
default:
return fmt.Errorf("Unable to attach as the pod is in an unknown state: %s", selectedPod.Status.Phase)