event package - github.com/AliceO2Group/Control/common/event - Go Packages
Package event provides event publishing and streaming functionality for O² Control components, supporting Kafka and dummy event writers.
- Variables
- type AgentFailedEvent
- type AnnounceTaskPIDEvent
- func (e *AnnounceTaskPIDEvent) GetLabels() map[string]string
- func (e *AnnounceTaskPIDEvent) GetName() string
- func (e *AnnounceTaskPIDEvent) GetTaskId() string
- func (e *AnnounceTaskPIDEvent) GetTaskPID() int
- func (e *AnnounceTaskPIDEvent) GetTimestamp() string
- func (e *AnnounceTaskPIDEvent) SetLabels(labels map[string]string)
- type BasicTaskTerminated
- type DeviceEvent
- type DeviceEventBase
- type DeviceEventOrigin
- type DummyReader
- type DummyWriter
- type EndOfStream
- type EnvironmentEvent
- func (e *EnvironmentEvent) GetError() string
- func (e *EnvironmentEvent) GetLabels() map[string]string
- func (e *EnvironmentEvent) GetMessage() string
- func (e *EnvironmentEvent) GetName() string
- func (e *EnvironmentEvent) GetRun() uint32
- func (e *EnvironmentEvent) GetState() string
- func (e *EnvironmentEvent) GetTimestamp() string
- func (e *EnvironmentEvent) SetLabels(labels map[string]string)
- type Event
- type ExecutorFailedEvent
- type FifoBuffer
- type HasEnvID
- type IntegratedServiceEvent
- type IntegratedServiceEventBase
- type KafkaReader
- type KafkaWriter
- type Reader
- type RoleEvent
- func (e *RoleEvent) GetLabels() map[string]string
- func (r *RoleEvent) GetName() string
- func (r *RoleEvent) GetRolePath() string
- func (r *RoleEvent) GetState() string
- func (r *RoleEvent) GetStatus() string
- func (e *RoleEvent) GetTimestamp() string
- func (e *RoleEvent) SetLabels(labels map[string]string)
- type TaskEvent
- func (r *TaskEvent) GetClassName() string
- func (r *TaskEvent) GetHostname() string
- func (e *TaskEvent) GetLabels() map[string]string
- func (r *TaskEvent) GetName() string
- func (r *TaskEvent) GetState() string
- func (r *TaskEvent) GetStatus() string
- func (r *TaskEvent) GetTaskID() string
- func (e *TaskEvent) GetTimestamp() string
- func (e *TaskEvent) SetLabels(labels map[string]string)
- type TaskInternalError
- type TasksReleasedEvent
- func (tr *TasksReleasedEvent) GetEnvironmentId() uid.ID
- func (e *TasksReleasedEvent) GetLabels() map[string]string
- func (tr *TasksReleasedEvent) GetName() string
- func (tr *TasksReleasedEvent) GetTaskIds() []string
- func (tr *TasksReleasedEvent) GetTaskReleaseErrors() map[string]error
- func (e *TasksReleasedEvent) GetTimestamp() string
- func (e *TasksReleasedEvent) SetLabels(labels map[string]string)
- type TasksStateChangedEvent
- func (tr *TasksStateChangedEvent) GetEnvironmentId() uid.ID
- func (e *TasksStateChangedEvent) GetLabels() map[string]string
- func (tr *TasksStateChangedEvent) GetName() string
- func (tr *TasksStateChangedEvent) GetTaskIds() []string
- func (tr *TasksStateChangedEvent) GetTasksStateChangedError() error
- func (e *TasksStateChangedEvent) GetTimestamp() string
- func (e *TasksStateChangedEvent) SetLabels(labels map[string]string)
- type Writer
This section is empty.
var ( KAFKAWRITER = "kafka_writer" KAFKAPREPARE = "kafka_prepare" )
This section is empty.
func (e *AgentFailedEvent) GetTimestamp() string
func (e *AnnounceTaskPIDEvent) GetTaskPID() int
func (e *AnnounceTaskPIDEvent) GetTimestamp() string
type BasicTaskTerminated struct {
DeviceEventBase
ExitCode int `json:"exitCode"`
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
VoluntaryTermination bool `json:"voluntaryTermination"`
FinalMesosState mesos.TaskState `json:"finalMesosState"`
}
func (e *BasicTaskTerminated) GetTimestamp() string
type DeviceEvent interface {
Event
GetOrigin() DeviceEventOrigin
GetType() pb.DeviceEventType
}
func NewDeviceEvent(origin DeviceEventOrigin, t pb.DeviceEventType) (de DeviceEvent)
type DeviceEventBase struct {
Type pb.DeviceEventType `json:"type"`
Origin DeviceEventOrigin `json:"origin"`
}
func (b *DeviceEventBase) GetOrigin() DeviceEventOrigin
func (e *DeviceEventBase) GetTimestamp() string
func (b *DeviceEventBase) GetType() pb.DeviceEventType
type DeviceEventOrigin struct {
AgentId mesos.AgentID `json:"agentId"`
ExecutorId mesos.ExecutorID `json:"executorId"`
TaskId mesos.TaskID `json:"taskId"`
}
type DummyReader struct{}
DummyReader is an implementation of Reader that returns no events.
type DummyWriter struct{}
func (*DummyWriter) Close()
func (*DummyWriter) WriteEvent(interface{})
func (*DummyWriter) WriteEventWithTimestamp(interface{}, time.Time)
type EndOfStream struct {
DeviceEventBase
}
func (e *EndOfStream) GetTimestamp() string
func (e *EnvironmentEvent) GetTimestamp() string
type ExecutorFailedEvent struct {
ExecutorId mesos.ExecutorID `json:"executorId"`
}
func (e *ExecutorFailedEvent) GetTimestamp() string
type FifoBuffer[T any] struct { }
This structure is meant to be used as a threadsafe FIFO with builtin waiting for new data in its Pop and PopMultiple functions. It is meant to be used with multiple goroutines, it is a waste of synchronization mechanisms if used synchronously.
func NewFifoBuffer[T any]() (result FifoBuffer[T])
func (this *FifoBuffer[T]) Length() int
func (this *FifoBuffer[T]) PopMultiple(numberToPop uint) (result []T)
Blocks until it has some value in internal buffer
func (this *FifoBuffer[T]) Push(value T)
func (this *FifoBuffer[T]) ReleaseGoroutines()
type HasEnvID interface {
GetEnvironmentId() string
}
type IntegratedServiceEventBase struct {
ServiceName string `json:"serviceName"`
}
func (e *IntegratedServiceEventBase) GetTimestamp() string
KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events.
NewReaderWithTopic creates a KafkaReader for the provided topic and starts it.
Last fetches the last available message on the topic (considering all partitions). If multiple partitions have data, the event with the greatest message timestamp is returned.
Next blocks until the next event is available or ctx is cancelled.
Kafka writer is used to convert events from events.proto into kafka messages and to write them. it is built with 2 workers:
#1 is gathering kafka.Message from any goroutine which sends message into buffered channel and puts them into FifoBuffer. #2 is poping any messages from FifoBuffer and sends them to Kafka The reason for this setup over setting Async: true in kafka.Writer is the ability to have some error handling of failed messages. Moreover if we used only one worker that gathers messages from channel and then sends them directly to Kafka, we would block whole core if we receive lot of messages at once. So we split functionality into two workers: one is putting all messages into the buffer, so if we have a lot of messages buffer just grows without blocking whole core and the second does all the sending. This setup allows us to gather messages from any amount of goroutines without blocking/losing messages. Another benefit is batching messages instead of writing them one by one.
func (w *KafkaWriter) Close()
func (w *KafkaWriter) WriteEvent(e interface{})
func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time)
Reader interface provides methods to read events.
func (e *RoleEvent) GetTimestamp() string
func (e *TaskEvent) GetTimestamp() string
type TaskInternalError struct {
DeviceEventBase
}
func (e *TaskInternalError) GetTimestamp() string
type TasksReleasedEvent struct {
EnvironmentId uid.ID `json:"environmentId"`
TaskIdsReleased []string `json:"taskIdsReleased"`
TaskReleaseErrors map[string]error `json:"taskReleaseErrors"`
}
func (tr *TasksReleasedEvent) GetEnvironmentId() uid.ID
func (e *TasksReleasedEvent) GetTimestamp() string
type TasksStateChangedEvent struct {
EnvironmentId uid.ID `json:"environmentId"`
TaskIdsStateChanged []string `json:"taskIdsStatesChanged"`
TaskStateChangedErr error `json:"taskStateChangedErr"`
}
func NewTasksStateChangedEvent(envId uid.ID, taskIdsChangeState []string, taskStateChangedErr error) (tr *TasksStateChangedEvent)
func (tr *TasksStateChangedEvent) GetEnvironmentId() uid.ID
func (tr *TasksStateChangedEvent) GetTasksStateChangedError() error
func (e *TasksStateChangedEvent) GetTimestamp() string