1414package writeback
1515
1616import (
17+ "context"
1718 "fmt"
1819 "os"
1920 "time"
@@ -25,6 +26,8 @@ import (
2526 "github.com/uber/kraken/lib/store/metadata"
2627 "github.com/uber/kraken/utils/closers"
2728 "github.com/uber/kraken/utils/log"
29+
30+ "go.opentelemetry.io/otel/trace"
2831)
2932
3033// FileStore defines store operations required for write-back.
@@ -44,8 +47,8 @@ type Executor struct {
4447func NewExecutor (
4548 stats tally.Scope ,
4649 fs FileStore ,
47- backends * backend.Manager ) * Executor {
48-
50+ backends * backend.Manager ,
51+ ) * Executor {
4952 stats = stats .Tagged (map [string ]string {
5053 "module" : "writebackexecutor" ,
5154 })
@@ -65,33 +68,91 @@ func (e *Executor) Exec(r persistedretry.Task) error {
6568 if ! ok {
6669 return fmt .Errorf ("expected *Task, got %T" , r )
6770 }
68- if err := e .upload (t ); err != nil {
71+
72+ // Extract context from task for trace propagation
73+ // Tasks from public endpoints will have trace context, internal tasks may not
74+ ctx := e .getContextFromTask (t )
75+
76+ log .WithTraceContext (ctx ).With (
77+ "namespace" , t .Namespace ,
78+ "name" , t .Name ,
79+ "has_trace_context" , t .HasTraceContext (),
80+ ).Debug ("Executing writeback task" )
81+
82+ if err := e .upload (ctx , t ); err != nil {
83+ log .WithTraceContext (ctx ).With (
84+ "namespace" , t .Namespace ,
85+ "name" , t .Name ,
86+ "error" , err ,
87+ ).Error ("Failed to upload during writeback" )
6988 return err
7089 }
90+
7191 err := e .fs .DeleteCacheFileMetadata (t .Name , & metadata.Persist {})
7292 if err != nil && ! os .IsNotExist (err ) {
93+ log .WithTraceContext (ctx ).With (
94+ "namespace" , t .Namespace ,
95+ "name" , t .Name ,
96+ "error" , err ,
97+ ).Error ("Failed to delete persist metadata" )
7398 return fmt .Errorf ("delete persist metadata: %s" , err )
7499 }
100+
101+ log .WithTraceContext (ctx ).With (
102+ "namespace" , t .Namespace ,
103+ "name" , t .Name ,
104+ ).Debug ("Successfully completed writeback task" )
105+
75106 return nil
76107}
77108
78- func (e * Executor ) upload (t * Task ) error {
109+ // getContextFromTask extracts trace context from a task if available.
110+ // Returns context.Background() if no trace context is present.
111+ func (e * Executor ) getContextFromTask (t * Task ) context.Context {
112+ if ! t .HasTraceContext () {
113+ return context .Background ()
114+ }
115+
116+ spanCtx := t .SpanContext ()
117+ if ! spanCtx .IsValid () {
118+ return context .Background ()
119+ }
120+
121+ // Create a context with the span context for logging correlation
122+ return trace .ContextWithSpanContext (context .Background (), spanCtx )
123+ }
124+
125+ func (e * Executor ) upload (ctx context.Context , t * Task ) error {
79126 start := time .Now ()
80127
81- log .With ("namespace" , t .Namespace , "name" , t .Name ).Info ("Uploading cache file to the remote backend" )
128+ log .WithTraceContext (ctx ).With (
129+ "namespace" , t .Namespace ,
130+ "name" , t .Name ,
131+ ).Info ("Uploading cache file to the remote backend" )
132+
82133 client , err := e .backends .GetClient (t .Namespace )
83134 if err != nil {
84135 if err == backend .ErrNamespaceNotFound {
85- log .With (
136+ log .WithTraceContext ( ctx ). With (
86137 "namespace" , t .Namespace ,
87- "name" , t .Name ).Info ("Dropping writeback for unconfigured namespace" )
138+ "name" , t .Name ,
139+ ).Info ("Dropping writeback for unconfigured namespace" )
88140 return nil
89141 }
142+ log .WithTraceContext (ctx ).With (
143+ "namespace" , t .Namespace ,
144+ "name" , t .Name ,
145+ "error" , err ,
146+ ).Error ("Failed to get backend client" )
90147 return fmt .Errorf ("get client: %s" , err )
91148 }
92149
93150 if _ , err := client .Stat (t .Namespace , t .Name ); err == nil {
94151 // File already uploaded, no-op.
152+ log .WithTraceContext (ctx ).With (
153+ "namespace" , t .Namespace ,
154+ "name" , t .Name ,
155+ ).Debug ("File already exists in backend, skipping upload" )
95156 return nil
96157 }
97158
@@ -100,17 +161,39 @@ func (e *Executor) upload(t *Task) error {
100161 if os .IsNotExist (err ) {
101162 // Nothing we can do about this but make noise and drop the task.
102163 e .stats .Counter ("missing_files" ).Inc (1 )
103- log .With ("name" , t .Name ).Error ("Invariant violation: writeback cache file missing" )
164+ log .WithTraceContext (ctx ).With (
165+ "namespace" , t .Namespace ,
166+ "name" , t .Name ,
167+ ).Error ("Invariant violation: writeback cache file missing" )
104168 return nil
105169 }
170+ log .WithTraceContext (ctx ).With (
171+ "namespace" , t .Namespace ,
172+ "name" , t .Name ,
173+ "error" , err ,
174+ ).Error ("Failed to get cache file reader" )
106175 return fmt .Errorf ("get file: %s" , err )
107176 }
108177 defer closers .Close (f )
109178
179+ log .WithTraceContext (ctx ).With (
180+ "namespace" , t .Namespace ,
181+ "name" , t .Name ,
182+ ).Debug ("Starting backend upload" )
183+
110184 if err := client .Upload (t .Namespace , t .Name , f ); err != nil {
185+ log .WithTraceContext (ctx ).With (
186+ "namespace" , t .Namespace ,
187+ "name" , t .Name ,
188+ "error" , err ,
189+ ).Error ("Backend upload failed" )
111190 return fmt .Errorf ("upload: %s" , err )
112191 }
113- log .With ("namespace" , t .Namespace , "name" , t .Name ).Info ("Uploaded cache file to remote backend" )
192+
193+ log .WithTraceContext (ctx ).With (
194+ "namespace" , t .Namespace ,
195+ "name" , t .Name ,
196+ ).Info ("Uploaded cache file to remote backend" )
114197
115198 // We don't want to time noops nor errors.
116199 e .stats .Timer ("upload" ).Record (time .Since (start ))
0 commit comments